无冥冥之志者,无昭昭之明;无惛惛之事者,无赫赫之功。
解释:没有专心致志地刻苦学习,就没有融会贯通的智慧;没有埋头执着的工作,就不会取得显著的成就。
RxJava3.0线程切换原理
这是Android开发面试过程中经典题目,被问到过好几次,没能准确答上来。现在尝试通过(RxJava3.0)阅读其源码来一探究竟。
1、RxJava是进行线程切换的流程是什么?
RxJava线程切换流程,其实就是指定在哪个线程中执行任务。可以通过observerOn方法指定Observer(观察者)所在的线程;通过subscriberOn方法指定Observable所在的线程。
2、RxJava线程切换有哪些选项?
RxJava线程切换的选项其实就是observerOn方法和subscriberOn方法的参数可选值:
选项 | 解释 |
---|---|
AndroidSchedulers.mainThread() | 在主线程中执行任务(需要引入RxAndroid) |
Schedulers.single() | 使用单一线程的线程池实现 |
Schedulers.io() | 使用了线程池实现 |
Schedulers.newThread() | 在新线程中执行任务 |
Schedulers.trampoline() | 暂停当前线程的任务,由当前线程来执行RxJava任务 |
Schedulers.computation() | 用于计算任务,线程数是CPU核数 |
Schedulers.from() | 设置自定义的Executor |
Rxjava的线程切换依赖Schedule这个类,不同的选项需要使用Schedule的子类来实现不同的功能。Schedule的子类有:
子类 | 对应选项 |
---|---|
HandlerScheduler | AndroidSchedulers.mainThread() |
SingleScheduler | Schedulers.single() |
IoScheduler | Schedulers.io() |
NewThreadScheduler | Schedulers.newThread() |
TrampolineScheduler | Schedulers.trampoline() |
ComputationScheduler | Schedulers.computation() |
ExecutorScheduler | Schedulers.from() |
3、RxJava线程切换的原理?
subscribeOn(被观察者任务)线程的切换原理
subscribeOn是将Observable(被观察者)的任务切换到指定的线程中执行,看一下这个方法的源码:
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");//检查传入的对象是否为null
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));返回的ObservableSubscribeOn对象
}
上方subscribeOn源码可以知道,它返回看一个ObservableSubscribeOn对象,参数:
参数 | 解释 |
---|---|
this | 这个是一个Observable对象,但是Observable有很多子类,注意是哪一个 |
scheduler | 这个就是指定在哪种类型的线程下执行被观察者的任务。 |
subscribeOn方法就这么简单,创建并返回一个ObservableSubscribeOn对象。
再看看ObservableSubscribeOn的构造方法:
//ObservableSubscribeOn的构造方法
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
这里需要记住source什么,就是调用链传递过来的一个Observable对象,但是Observable有很多子类,注意是哪一个子类的对象。
先绕过observerOn方法(Observer,即观察者)的线程切换,直接看subscribe(订阅)方法。这个方法就是将Observable(被观察者)和Observer(观察者)联系起来的方法。看一下源码:
//Observable.java
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");//检查
try {
//...
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
//...
}
}
subscribe(订阅)方法在Observable类中,它的子类都没有实现这个方法。在这个方法中去除多余的代码之后,其实很简单就是调用了subscribeActual*(注意:这个方法是子类的方法,所以这里是哪个Observable的子类调用subscribe很重要),前面我们知道调用subscribe方法的是ObservableSubscribeOn这个类的对象。所以看一下这个类的subscribeActual方法源码:
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
在这个方法中,创建一个SubscribeOnObserver对象,这是一个Observer的子类,也就是一个观察者(注意参数是在subscribe方中传入的我们定义的Observer)。这个 SubscribeOnObserver对象封装类我们定义的Observer。然后,调用了SubscribeOnObserver的实例对象的onSubscribe方法。紧接着,先创建一个SubscribeTask对象,参数是SubscribeOnObserver的实例对象。SubscribeTask类是ObservableSubscribeOn类的内部类,看一下它的源码:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
SubscribeTask实现了Runnable接口。run方法直接执行source的subscribe方法,这里稍后再看。先回到ObservableSubscribeOn类的subscribeActual方法,接着把SubscribeTask的实例对象传入scheduler.scheduleDirect方法,这里的scheduler就是我们在subscribeOn方法指定的线程类型对应的scheduler,由于不同的Scheduler子类基本都重写了scheduleDirect方法,所以简单看一下Scheduler类的scheduleDirect方法源码:
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();//创建一个work对象
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);//执行任务
return task;
}
可以看到Scheduler类的scheduleDirect方法中创建了一个Worker对象来执行任务,这里就会涉及到不同的Scheduler子类执行任务采用不同的策略,就不展开讲了。到这里,subscribeOn的线程切换就完成了,根据指定不同的Scheduler子类,可以指定在什么线程下完成Observable的任务。
observerOn(观察者任务)线程的切换原理
先看一下observerOn方法的源码:
public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
//...
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
方法也比较简单,返回了一个ObservableObserveOn的实例对象,看一下ObservableObserveOn类的构造方法源码:
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
方法比较简单,初始化一些参数。需要注意的是source是一个Observable的对象,具体是哪个子类需要清楚。scheduler是我们指定的线程对应的Scheduler。
observerOn方法已经分析完了,非常简单。RxJava使用的是链式调用,在observerOn方法后面有可能是subscribe方法,也可能是subscriberOn方法。从前面的subscriberOn的分析中,最后调用的是SubscribeTask的run方法。回到SubscribeTask的run方法,方法调用的是source.subscribe(parent);。这里的source是某一个Observable的子类的对象,但是Observable的子类都没有实现这个方法,所以还是调用Observable的subscribe方法,再看一下源码:
//Observable.java
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");//检查
try {
//...
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
//...
}
}
subscribe(订阅)方法,在这个方法中去除多余的代码之后,其实很简单就是调用了subscribeActual*(注意:这个方法是子类的方法,所以这里是哪个Observable的子类调用subscribe很重要)。参数是一个Observer,这里的参数可能是我们自定义的Observer,也可能是subscriberOn方法一层层封装后的Observer。通过前面分析,subscriberOn方法最后传进来的是一个SubscribeOnObserver类的实例对象。
不管是SubscribeOnObserver类的实例对象还是自定义的Observer,经过observerOn之后,都会是ObservableObserveOn类的对象。所以看一下ObservableObserveOn类的 subscribeActual*方法源码:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
方法比较简单,主要是区别一下TrampolineScheduler和其他Scheduler,先看其他,会创建一个Scheduler.Worker,然后调用source.subscribe方法,这里的source是Observable的子类的对象。经过前面知道,调用subscribe方法最终调用的是Observable子类的subscribeActual*方法,这里的Observable子类有可能是subscriberOn返回的ObservableSubscribeOn,也有可能是我们定义的Observable子类对象。
由于这里必须要知道调用的是Observable的哪个子类的subscribeActual方法,才能继续往下。所以简单起见,以ObservableFromArray为例,继续介绍。看一下它的subscribeActual*方法源码:
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
上述源码中(参数是ObserveOnObserver),首先创建FromArrayDisposable对象,然后调用observer的onSubscribe。FromArrayDisposable对象的fusionMode变量默认是false,所以最后调用run方法,看一下源码:
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
FromArrayDisposable对象的run方法就是循环调用downstream的onNext方法,发生错位调用onError,完成调用onComplete方法。downstream就是前面传过来的Observer。这个Observer是ObserveOnObserver类的实例对象。所以看一下ObserveOnObserver类的onNext方法源码:
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
非常简单,调用一下schedule方法,看一下源码:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
这里的worker就是使用observerOn指定的Scheduler创建出来的,后续调用worker.schedule就涉及到不同的Scheduler,使用不同线程策略了。到这ObserverOn就完成了观察者任务的切换。
总结
- RxJava线程切换使用observerOn和subscriberOn方法
- 线程切换可以设置多种Scheduler,即线程策略
- observerOn方法会将上游Observable封装成一个ObserverObserverOn类的实例对象,然后调用到ObserverObserverOn类的subscribeActual方法时,创建对应Scheduler的Worker类对象。
- subscriberOn方法会将上游的Observable封装成一个ObservableSubscriberOn类的实例对象。调用ObservableSubscriberOn的subscribeActual方法时,使用指定的Scheduler进行线程切换。
- 调用ObservableObserveOn的subscribe方法会直接引发上游 subscribe的使用。调用ObservableSubscriberOn的subscribe方法会在指定的线程中引发上游subscribe方法调用。最终调用onNext方法的时候,会触发ObservableObserveOn.ObserveOnObserver的onNext,在这里触发在指定线程中执行观察者的任务。
- 不同的Scheduler使用不同的线程策略。