在使用 RxJava 的过程中,我们经常会使用到其线程切换的功能,而线程切换的功能主要通过 subscribeOn() 和 observeOn() 两个方法实现,

下面是一段使用 RxJava 进行线程切换的样例代码:

主线程

样例

 Observable.just("Hello, world !")
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(@NonNull Disposable d) {}

            @Override
            public void onNext(@NonNull String s) {}

            @Override
            public void onError(@NonNull Throwable e) {}

            @Override
            public void onComplete() {}
        });

先看一下 just() 方法的源码:

Observable # just()

public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "The item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

just() 方法会返回一个 ObservableJust 对象,然后分别看一下 subscribeOn() 和 observeOn() 的源码:

Observable # subscribeOn() observeOn()

public final Observable<T> subscribeOn(Scheduler scheduler){
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false , bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError , int bufferSize) {
    ......
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler , delayError , bufferSize));
}

从源码可以看出, subscribeOn() 方法最终会返回一个 ObservableSubscribeOn 对象,而 observeOn() 方法最终会返回一个 ObservableObserveOn 对象,

上述的ObservableJust、ObservableSubscribeOn、ObservableObserveOn都是 Observable 类型的对象。

由于调用 subscribeOn() 和 observeOn() 构造相应的 Observable 对象时,都会把 this 传入构造方法,所以最后会存在如下的引用关系:

ObservableObserveOn -> ObservableSubscribeOn -> ObservableJust

Observable # subscribe()

上面的样例代码最终调用的 subscribe() 方法实际上是 ObservableObserveOn 对象的 subscreibe() 方法,而上述的三个 Observalbe 对象的 subscribe() 的代码都是相同的,都继承自 Observable 类的 subscribe() 方法:

//Observable.java
public final void subscribe(Observer<? super T> observer) {
    ...
    observer = RxJavaPlugins.onSubscribe(this, observer);
    ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    subscribeActual(observer);
    ...
}

subscribe() 方法会调用 subscribeActual() 方法,上述的三个 Observable 对象的差异主要就集中在 subscribeActual() 方法中, ObservableObserveOn 的 subscribeActual() 方法源码如下:

ObservableObserveOn # subscribeActual()

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler , boolean delayError , int bufferSize) {
        super(source); // ObservableSubscribeOn 对象
        this.scheduler = scheduler;// HandlerScheduler
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //scheduler 是 HandlerScheduler ,所以会执行到这里
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w , delayError , bufferSize));
        }
    }
}

  1. 先构造一个 Worker 对象,
  2. 构造一个 ObserveOnObserver 对象, ObserveOnObserver 会引用 subscribe() 方法传入的 Observer 对象和 Worker 对象;
  3. 调用 source 成员变量的 subscribe() 方法,这里的 source 实际上是 ObservableSubscribeOn 对象

于是我们就进入 ObservableSubscribeOn 的 subscribeActual() 方法:

ObservableSubscribeOn # subscribeActual()

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source); //ObservableJust 对象
        this.scheduler = scheduler; // IOScheduler对象
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    ......
}
  1. 构造了一个 SubscribeOnObserver 对象,这个对象会引用 subscribeActual() 方法传入的 Observer 对象即 ObserveOnObserver ,这里也形成了一条引用链:
SubscribeOnObserver -> ObserveOnObserver -> 样例中通过 subscribe() 方法传入的Observer
  1. SubscribeOnObserver会被用于构造 SubscribeTask 对象, SubscribeTask 对象又会用于调用 Scheduler 的 scheduleDirect() 方法,先看一下 SubscribeTask 的代码:
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

可见 SubscribeTask 是个 Runnalbe 类型对象,它的 run() 方法中调用了 source 的 subscribe() 方法,这里的 source 是ObservableJust;

回到前面对Scheduler(在这里实际是 IOScheduler ,因为 Schedulers.io() 就是一个IOScheduler)的 scheduleDirect() 方法:

IOScheduler # scheduleDirect()

    //IOScheduler 父类 Scheduler 的代码:
public Disposable scheduleDirect(@NonNull Runnable run) { //run 就是 SubscribeTask 对象
    return scheduleDirect(run, 0L , TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay , @NonNull TimeUnit unit) {
        final Worker w = createWorker(); // w 就是 EventLoopWorker 类型
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);// task 中就包含这个 SubscribeTask 对象
        w.schedule(task, delay , unit);
        return task;
}
  1. scheduleDirect()方法用 createWorker() 构造一个 Worker 对象,这里实际上是 EventLoopWorker 对象,
  2. 调用 EventLoopWorker 的 schedule() 方法,传递的参数就是封装了 SubscribeTask 对象的 DisposeTask 类, DisposeTask 本身也是一个 Runnable 对象

EventLoopWorker # schedule()

//IOScheduler.java
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get();
    }
    ......

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime , @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            return EmptyDisposable.INSTANCE;
        }
        return threadWorker.scheduleActual(action, delayTime , unit , tasks);
    }
}
  1. scheduleDirect()方法用 createWorker() 构造一个 Worker 对象,这里实际上是 EventLoopWorker 对象,
  2. 然后会调用 EventLoopWorker 的 schedule() 方法,传递的参数就是封装了 SubscribeTask 对象的 DisposeTask 类, DisposeTask 本身也是一个 Runnable 对象
     static final class DisposeTask implements Disposable , Runnable , SchedulerRunnableIntrospection {
      DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
             this.decoratedRun = decoratedRun;//decoratedRun 就是我们传递过来的 SubscribeTask 对象
             this.w = w;
         }
    
         @Override
         public void run() {
             ...
             decoratedRun.run();
             ...
         }
       }
    
  3. schedule()方法会调用 ThreadWorker 的 scheduleActual() 方法, ThreadWorker 继承于 NewThreadWorker ,这里调用的 scheduleActual() 也是来自于NewThreadWorker:

NewThreadWorker # scheduleActual()

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
  private final ScheduledExecutorService executor;
  public NewThreadWorker(ThreadFactory threadFactory) {
       executor = SchedulerPoolFactory.create(threadFactory);
   }

  public ScheduledRunnable scheduleActual(final Runnable run, long delayTime , @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        ......
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime , unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
          ......
        }

        return sr;
    }
}
  1. 创建 NewThreadWorker 时候就会创建一个 executor ,而 executor 就是一个线程池。并且是 ScheduledExecutorService 类型的。关于线程池的相关知识,可以查看 Android 线程池
    //SchedulerPoolFactory.java
    public static ScheduledExecutorService create(ThreadFactory factory) {
         final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
         tryPutIntoPool(PURGE_ENABLED, exec);
         return exec;
     }
    

这里会向线程池提交一个任务,executor.submit(sr),而这个 sr 就是封装了 DisposeTask 的 ScheduledRunnable 对象。 ScheduledRunnable 本身也是一个 Runnable 对象。

这样就把后续的逻辑切换到线程池的线程中执行。

切换到子线程中

屡一下啊。有点懵。

来个调用链:

ObservableObserveOn.subscribeActual(样例中通过 subscribe() 方法传入的Observer) -> ObservableSubscribeOn.subscribeActual(ObserveOnObserver)
->IOScheduler.scheduleDirect(SubscribeTask)->EventLoopWorker.schedule(DisposeTask)->NewThreadWorker.scheduleActual(DisposeTask)
->ScheduledExecutorService.schedule(ScheduledRunnable)-> 切换到子线程中

然后呢?

SubscribeTask , DisposeTask , ScheduledRunnable 都是一个 Runnable 对象。

ScheduledRunnable 中有一个 DisposeTask 对象。 DisposeTask 中有一个 SubscribeTask 对象。 俄罗斯套娃啊。

那么接下来的流程就是 run() 中执行 run() 呗

ScheduledRunnable.run()-> DisposeTask.run()-> SubscribeTask.run()

所以 最终会执行到前面的 SubscribeTask 的 run() 方法,再贴一下前面 SubscribeTask 的代码:

SubscribeTask # run()

 final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);//子线程中
        }
    }

source.subscribe(parent) 就肯定是在子线程中执行的。 source 就是 ObservableJust 对象,同样的会执行到 ObservableJust 的 subscribeActual() 方法:

ObservableJust # subscribeActual()

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);//子线程中
        sd.run();//子线程中
    }
    ......
}
```java
这里构造了一个 ScalarDisposable 对象后面会调用它的 run() 方法

### ScalarDisposable # run()
```java
public static final class ScalarDisposable<T>extends AtomicInteger
    implements QueueDisposable<T>, Runnable {

      @Override
      public void run() {
          observer.onNext(value);//子线程中
        }
      }
    }

这里会调用 observer 的 onNext() 方法,而 observer 实际上是SubscribeOnObserver:

SubscribeOnObserver # onNext()

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
      ...
        @Override
        public void onNext(T t) {
          //子线程
          actual.onNext(t);//actual 就是   ObserveOnObserver
        }
        .....
    }

调用了 actual.onNext(t),而 actual 就是 ObserveOnObserver 调用链如下:

->ScheduledRunnable.run()-> DisposeTask.run()-> SubscribeTask.run()
->ObservableJust.subscribeActual(SubscribeOnObserver) ->ScalarDisposable.run()
->SubscribeOnObserver.onNext()-> ObserveOnObserver.onNext()

目前这些都是在子线程中执行的。

ObserveOnObserver # onNext()

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    ......

    @Override
    public void onNext(T t) {
        。。。
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);//加入queue
        }
        schedule();
    }

    ......
    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);//传入 ObserveOnObserver 对象本身
        }
    }
}
  1. onNext()方法会把 value 加入 queue 成员变量,然后执行 schedule() 方法,
  2. schedule()方法中又会执行 worker 的 schedule() 方法,这里的 worker 实际上是 HandlerWorker ,
AndroidSchedulers.mainThread() = new HandlerScheduler(new Handler(Looper.getMainLooper()), false)

final class HandlerScheduler extends Scheduler {
  @Override
  public Worker createWorker() {
      return new HandlerWorker(handler, async);
    }
}    
  1. new Handler(Looper.getMainLooper() 肯定是在主线程的执行的Handler
  2. ObserveOnObserver 中的 worker 就是HandlerWorker

调用 worker 的 schedule() 方法时会传入 ObserveOnObserver 对象本身,这里需要注意的是 ObserveOnObserver 是实现了 Runnable 接口的.

HandlerWorker # schedule()

private static final class HandlerWorker extends Worker {
    @Override
    public Disposable schedule(Runnable run, long delay , TimeUnit unit) {
        ......
        // run 就是 ObserveOnObserver 对象
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        ......
        return scheduled;
    }
    ......
}
  1. 会把ObserveOnObserver(就是 run 参数)封装进 ScheduledRunnable 中, ScheduledRunnable 也实现了 Runnable 接口
  2. 再把 ScheduledRunnable 赋值给 Message 的 obj 成员,
  3. Message被用来作为参数调用 Handler 的 sendMessageDelayed() , 由于这个 Handler 对应的 Looper 是属于主线程的,这样 ScheduledRunnable 的 run() 方法便会在主线程执行

切换到主线程

ScheduledRunnable # run()

 private static final class ScheduledRunnable implements Runnable , Disposable {
        private final Handler handler;
        private final Runnable delegate;
        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            delegate.run();// delegate就是ObserveOnObserver
        }
        ......
    }

ObserveOnObserver # run()

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        @Override
        public void run() {
              ...
                drainNormal();
            }
        }
        void drainNormal() {
            int missed = 1;
            //这个是在 ObservableJust 中传入的
            final SimpleQueue<T> q = queue;
            //这个 actual 是最外边的 Observer ,即样例代码中
            //调用 subscribe() 传入的Observer
            final Observer<? super T> a = actual;
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    //第一次循环中调用 checkTerminated() 返回 false ,
                    //第二次循环中调用 checkTerminated() 返回true
                    if (checkTerminated(d, empty , a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }
                    //第一次循环中会调用这里
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

       boolean checkTerminated(boolean d, boolean empty , Observer<? super T> a) {
            if (cancelled) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    ......
                } else {
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else if (empty) {
                        //第一次循环中调用 checkTerminated() 不会进入这里,
                        //第二次循环中调用 checkTerminated() 会进入这里。
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
    }
  • 由于前面在 onNext() 方法里往 queue 里加入了一个元素,所以第一次进入循环的时候 queue 是不为空的, checkTerminated() 会返回 false ,
  • 之后会执行到a.next()这条语句,这里的 a 是样例中通过 subscribe() 方法传入的 Observer ,即这里会调用传入 Observer 的 oNext() 方法,
  • 然后进入第二次循环,由于这时 queue 为空,所以再进入 checkTerminated() 方法时,会调用 Observer 的 onComplete() 方法,
ObservableObserveOn.subscribeActual(样例中通过 subscribe() 方法传入的Observer) -> ObservableSubscribeOn.subscribeActual(ObserveOnObserver)
->IOScheduler.scheduleDirect(SubscribeTask)->EventLoopWorker.schedule(DisposeTask)->NewThreadWorker.scheduleActual(DisposeTask)
->ScheduledExecutorService.schedule(ScheduledRunnable)-> 切换到子线程中
->ScheduledRunnable.run()-> DisposeTask.run()-> SubscribeTask.run()
->ObservableJust.subscribeActual(SubscribeOnObserver) ->ScalarDisposable.run()
->SubscribeOnObserver.onNext()-> ObserveOnObserver.onNext()-> HandlerWorker.schedule()
->发送 Handler 消息切换到主线程
->ScheduledRunnable.run()-> ObserveOnObserver.run()
->ObserveOnObserver.drainNormal()-> 样例中通过 subscribe() 方法传入的Observer.onNext()

我觉得这里面最关键的是 Runnable 接口:

  • 通过线程池 ScheduledExecutorService schedule 一个 Runnable 对象 切到子线程
  • 通过主线程 Handler post 一个 Runnable 对象 切换到主线程

搬运地址: