无冥冥之志者,无昭昭之明;无惛惛之事者,无赫赫之功。
 解释:没有专心致志地刻苦学习,就没有融会贯通的智慧;没有埋头执着的工作,就不会取得显著的成就。

RxJava3.0线程切换原理

   这是Android开发面试过程中经典题目,被问到过好几次,没能准确答上来。现在尝试通过(RxJava3.0)阅读其源码来一探究竟。

1、RxJava是进行线程切换的流程是什么?

   RxJava线程切换流程,其实就是指定在哪个线程中执行任务。可以通过observerOn方法指定Observer(观察者)所在的线程;通过subscriberOn方法指定Observable所在的线程。

2、RxJava线程切换有哪些选项?

   RxJava线程切换的选项其实就是observerOn方法和subscriberOn方法的参数可选值:

选项 解释
AndroidSchedulers.mainThread() 在主线程中执行任务(需要引入RxAndroid)
Schedulers.single() 使用单一线程的线程池实现
Schedulers.io() 使用了线程池实现
Schedulers.newThread() 在新线程中执行任务
Schedulers.trampoline() 暂停当前线程的任务,由当前线程来执行RxJava任务
Schedulers.computation() 用于计算任务,线程数是CPU核数
Schedulers.from() 设置自定义的Executor

   Rxjava的线程切换依赖Schedule这个类,不同的选项需要使用Schedule的子类来实现不同的功能。Schedule的子类有:

子类 对应选项
HandlerScheduler AndroidSchedulers.mainThread()
SingleScheduler Schedulers.single()
IoScheduler Schedulers.io()
NewThreadScheduler Schedulers.newThread()
TrampolineScheduler Schedulers.trampoline()
ComputationScheduler Schedulers.computation()
ExecutorScheduler Schedulers.from()

3、RxJava线程切换的原理?

subscribeOn(被观察者任务)线程的切换原理

   subscribeOn是将Observable(被观察者)的任务切换到指定的线程中执行,看一下这个方法的源码:

public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");//检查传入的对象是否为null

    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));返回的ObservableSubscribeOn对象
}

   上方subscribeOn源码可以知道,它返回看一个ObservableSubscribeOn对象,参数:

参数 解释
this 这个是一个Observable对象,但是Observable有很多子类,注意是哪一个
scheduler 这个就是指定在哪种类型的线程下执行被观察者的任务。

   subscribeOn方法就这么简单,创建并返回一个ObservableSubscribeOn对象。

   再看看ObservableSubscribeOn的构造方法:

//ObservableSubscribeOn的构造方法
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);
    this.scheduler = scheduler;
}

   这里需要记住source什么,就是调用链传递过来的一个Observable对象,但是Observable有很多子类,注意是哪一个子类的对象。

   先绕过observerOn方法(Observer,即观察者)的线程切换,直接看subscribe(订阅)方法。这个方法就是将Observable(被观察者)和Observer(观察者)联系起来的方法。看一下源码:

//Observable.java

public final void subscribe(@NonNull Observer<? super T> observer) {
    Objects.requireNonNull(observer, "observer is null");//检查
    try {
       //...

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
           //... 
    }
}

   subscribe(订阅)方法在Observable类中,它的子类都没有实现这个方法。在这个方法中去除多余的代码之后,其实很简单就是调用了subscribeActual*(注意:这个方法是子类的方法,所以这里是哪个Observable的子类调用subscribe很重要),前面我们知道调用subscribe方法的是ObservableSubscribeOn这个类的对象。所以看一下这个类的subscribeActual方法源码:

public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

    observer.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

   在这个方法中,创建一个SubscribeOnObserver对象,这是一个Observer的子类,也就是一个观察者(注意参数是在subscribe方中传入的我们定义的Observer)。这个 SubscribeOnObserver对象封装类我们定义的Observer。然后,调用了SubscribeOnObserver的实例对象的onSubscribe方法。紧接着,先创建一个SubscribeTask对象,参数是SubscribeOnObserver的实例对象。SubscribeTask类是ObservableSubscribeOn类的内部类,看一下它的源码:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

   SubscribeTask实现了Runnable接口。run方法直接执行source的subscribe方法,这里稍后再看。先回到ObservableSubscribeOn类的subscribeActual方法,接着把SubscribeTask的实例对象传入scheduler.scheduleDirect方法,这里的scheduler就是我们在subscribeOn方法指定的线程类型对应的scheduler,由于不同的Scheduler子类基本都重写了scheduleDirect方法,所以简单看一下Scheduler类的scheduleDirect方法源码:

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();//创建一个work对象

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);//执行任务

        return task;
    }

   可以看到Scheduler类的scheduleDirect方法中创建了一个Worker对象来执行任务,这里就会涉及到不同的Scheduler子类执行任务采用不同的策略,就不展开讲了。到这里,subscribeOn的线程切换就完成了,根据指定不同的Scheduler子类,可以指定在什么线程下完成Observable的任务。

observerOn(观察者任务)线程的切换原理

   先看一下observerOn方法的源码:

    public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        //...
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }

   方法也比较简单,返回了一个ObservableObserveOn的实例对象,看一下ObservableObserveOn类的构造方法源码:

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

   方法比较简单,初始化一些参数。需要注意的是source是一个Observable的对象,具体是哪个子类需要清楚。scheduler是我们指定的线程对应的Scheduler。

   observerOn方法已经分析完了,非常简单。RxJava使用的是链式调用,在observerOn方法后面有可能是subscribe方法,也可能是subscriberOn方法。从前面的subscriberOn的分析中,最后调用的是SubscribeTask的run方法。回到SubscribeTask的run方法,方法调用的是source.subscribe(parent);。这里的source是某一个Observable的子类的对象,但是Observable的子类都没有实现这个方法,所以还是调用Observable的subscribe方法,再看一下源码:

//Observable.java

public final void subscribe(@NonNull Observer<? super T> observer) {
    Objects.requireNonNull(observer, "observer is null");//检查
    try {
       //...

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
           //... 
    }
}

   subscribe(订阅)方法,在这个方法中去除多余的代码之后,其实很简单就是调用了subscribeActual*(注意:这个方法是子类的方法,所以这里是哪个Observable的子类调用subscribe很重要)。参数是一个Observer,这里的参数可能是我们自定义的Observer,也可能是subscriberOn方法一层层封装后的Observer。通过前面分析,subscriberOn方法最后传进来的是一个SubscribeOnObserver类的实例对象。

   不管是SubscribeOnObserver类的实例对象还是自定义的Observer,经过observerOn之后,都会是ObservableObserveOn类的对象。所以看一下ObservableObserveOn类的 subscribeActual*方法源码:

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }

   方法比较简单,主要是区别一下TrampolineScheduler和其他Scheduler,先看其他,会创建一个Scheduler.Worker,然后调用source.subscribe方法,这里的source是Observable的子类的对象。经过前面知道,调用subscribe方法最终调用的是Observable子类的subscribeActual*方法,这里的Observable子类有可能是subscriberOn返回的ObservableSubscribeOn,也有可能是我们定义的Observable子类对象。

   由于这里必须要知道调用的是Observable的哪个子类的subscribeActual方法,才能继续往下。所以简单起见,以ObservableFromArray为例,继续介绍。看一下它的subscribeActual*方法源码:

    public void subscribeActual(Observer<? super T> observer) {
        FromArrayDisposable<T> d = new FromArrayDisposable<>(observer, array);

        observer.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }

   上述源码中(参数是ObserveOnObserver),首先创建FromArrayDisposable对象,然后调用observer的onSubscribe。FromArrayDisposable对象的fusionMode变量默认是false,所以最后调用run方法,看一下源码:

    void run() {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    downstream.onError(new NullPointerException("The element at index " + i + " is null"));
                    return;
                }
                downstream.onNext(value);
            }
            if (!isDisposed()) {
                downstream.onComplete();
            }
        }

   FromArrayDisposable对象的run方法就是循环调用downstream的onNext方法,发生错位调用onError,完成调用onComplete方法。downstream就是前面传过来的Observer。这个Observer是ObserveOnObserver类的实例对象。所以看一下ObserveOnObserver类的onNext方法源码:

    public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }

   非常简单,调用一下schedule方法,看一下源码:

    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }

   这里的worker就是使用observerOn指定的Scheduler创建出来的,后续调用worker.schedule就涉及到不同的Scheduler,使用不同线程策略了。到这ObserverOn就完成了观察者任务的切换。

总结

  1. RxJava线程切换使用observerOn和subscriberOn方法
  1. 线程切换可以设置多种Scheduler,即线程策略
  1. observerOn方法会将上游Observable封装成一个ObserverObserverOn类的实例对象,然后调用到ObserverObserverOn类的subscribeActual方法时,创建对应Scheduler的Worker类对象。
  1. subscriberOn方法会将上游的Observable封装成一个ObservableSubscriberOn类的实例对象。调用ObservableSubscriberOn的subscribeActual方法时,使用指定的Scheduler进行线程切换。
  1. 调用ObservableObserveOn的subscribe方法会直接引发上游 subscribe的使用。调用ObservableSubscriberOn的subscribe方法会在指定的线程中引发上游subscribe方法调用。最终调用onNext方法的时候,会触发ObservableObserveOn.ObserveOnObserver的onNext,在这里触发在指定线程中执行观察者的任务。
  1. 不同的Scheduler使用不同的线程策略。




results matching ""

    No results matching ""