没有严格按照RxJava 2.0 源码去分析,而是根据 https://github.com/yds17322/YRxJava 这个去分析的,其实原理都是一样的,只不过这个更简单,更容易理解。

Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.i(TAG, "subscribe --> " + (Looper.myLooper() == Looper.getMainLooper()));
                // 调用 ObservableCreate 中 CreateEmitter 的onNext
                emitter.onNext("aaa");
                emitter.onError(new RuntimeException("123123123"));
                emitter.onComplete();
            }
        })
        .switchUpThread(Observable.NEW_THREAD)
        .switchDownThread(Observable.MAIN_THREAD)
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe() {
                Log.i(TAG, "subscribe -  onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext --> " + (Looper.myLooper() == Looper.getMainLooper()) + ", s : " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError --> " + (Looper.myLooper() == Looper.getMainLooper()) + ", e : " + e);
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete --> " + (Looper.myLooper() == Looper.getMainLooper()));
            }
        });
  1. Observable creatObservable = Observable.create(observableOnSubscribe); 以 observableOnSubscribe 为 source , create 一个 Observable 可观察者 此时是 ObservableCreate 对象
  2. Observable newThreadObservable = creatObservable.switchUpThread(Observable.NEW_THREAD); 以 ObservableCreate 为 source ,创建一个 ObservableSwitchUpThread 对象
  3. Observable switchDownThread = newThreadObservable.switchDownThread(Observable.MAIN_THREAD); 以 ObservableSwitchUpThread 为 source ,创建 ObservableSwitchDownThread 对象,

  4. switchDownThread.subscribe(observer); subscribe() 是 Observable 的方法,但是里面只调用了 subscribeActual() ,这是一个抽象方法,是需要各个子类实现的, ObservableSwitchDownThread ObservableSwitchUpThread 和 ObservableCreate 都是 extends Observable ,所以我们就知道 switchDownThread.subscribe(observer); 真实的是执行到了 ObservableSwitchDownThread 中的 subscribeActual().
@Override
   protected void subscribeActual(Observer<? super T> observer) {
       DownThreadObserver threadObserver = new DownThreadObserver(mHandler, threadId , observer);
       source.subscribe(threadObserver);  
   }

observer 就是创建的 Observer 对象,里面包含了onSubscribe()。onNext()。 onError() , onComplete() 方法。

主要步骤:

  1. 创建了 DownThreadObserver 对象, DownThreadObserver extends Observer 并且 实现了 Runnable 方法,里面包含了之前创建的 observer ,以及线程id (Observable.MAIN_THREAD)
  2. source 就是 创建 ObservableSwitchUpThread 中被封装的 ObservableSwitchUpThread 对象,同理执行到了 ObservableSwitchUpThread 中的 subscribeActual()

切换到子线程 ObservableSwitchUpThread # subscribeActual()

@Override
protected void subscribeActual(Observer<? super T> observer) {//observer 就是 DownThreadObserver 对象
    UpThreadObserver upThreadObserver = new UpThreadObserver(observer);
    UpThreadTask threadTask = new UpThreadTask(upThreadObserver);
    if (threadId == Observable.NEW_THREAD) {
        mNewThread = new Thread(threadTask);
        mNewThread.start();
    } else {
        mHandler.post(threadTask);
    }
}

  1. 根据 下游的 DownThreadObserver 对象,创建 UpThreadObserver 对象
  2. 根据 UpThreadObserver 对象 ,创建 UpThreadTask 对象, UpThreadTask 实现了 Runnable 接口 因为 ObservableSwitchUpThread 创建的时候已经定义了线程类型, Observable.NEW_THREAD ,所以就创建新线程,然后执行 线程 start() 方法,这样就切换到了子线程中。
class UpThreadTask implements Runnable {
       private UpThreadObserver upThreadObserver;
       public UpThreadTask(UpThreadObserver upThreadObserver) {
           this.upThreadObserver = upThreadObserver;
       }

       @Override
       public void run() {
           Log.e(TAG, "UpThreadTask - run");
           source.subscribe(upThreadObserver);
       }
   }

  1. run()方法 在子线程中执行的,那么source.subscribe(upThreadObserver);就是在子线程中执行的。 source 就是创建 ObservableSwitchUpThread 传递过来的 ObservableCreate 对象,于是就执行到了 ObservableCreate 中的 subscribeActual() ,这个时候已经在子线程中执行了。
@Override
protected void subscribeActual(Observer<? super T> observer) {//observer 是下游的 UpThreadObserver 对象
   CreateEmitter<T> emitter = new CreateEmitter<T>(observer);
   observer.onSubscribe();
   try {
       mOnSubscribe.subscribe(emitter);
   } catch (Exception e) {
       e.printStackTrace();
   }
}

  1. 根据 observer 然后创建一个 CreateEmitter , 所以 emitter 中就有一个 UpThreadObserver 对象。
  2. 执行 mOnSubscribe.subscribe(emitter); 也在子线程中, mOnSubscribe 就是我们开头创建的 Observable.create(observableOnSubscribe) 中的 observableOnSubscribe ,
ObservableOnSubscribe<String> observableOnSubscribe = new ObservableOnSubscribe<String>() {
           @Override
           public void subscribe(ObservableEmitter<String> emitter) throws Exception {
               Log.i(TAG, "subscribe --> " + (Looper.myLooper() == Looper.getMainLooper()));
               // 调用 ObservableCreate 中 CreateEmitter 的onNext
               emitter.onNext("aaa");
               emitter.onError(new RuntimeException("123123123"));
               emitter.onComplete();
           }
       };

所以就又执行到了 CreateEmitter 中,而 CreateEmitter 只是简单调用了 mObserver 的方法,这个 mObserver 前面说够了,就是 UpThreadObserver ,

@Override
public void onNext(T value) {     // 调用 UpThreadObserver 中的onNext     
  mObserver.onNext(value);
}
 @Override
public void onError(Throwable error) {     
  mObserver.onError(error);
}

@Override
public void onComplete() {     
  mObserver.onComplete();
}

UpThreadObserver 中的 onNext() ,而 UpThreadObserver 中又是对 DownThreadObserver 的封装,所以就又执行到了 DownThreadObserver 中的onNext()

切回主线程 DownThreadObserver # onNext()

@Override
public void onNext(final T t) {
  queue.add(next);
  this.t = t;
  prepareRun();
}
  1. 把这个事件类型(next)添加到队列中
  2. 保存这个事件,这个时候 在子线程中的,
  3. 执行 PrePareRun() 中,会切换到主线程中。
private synchronized void prepareRun() {
      if (threadId == MAIN_THREAD) {
          handler.post(this);
      } else {
          new Thread(this).start();
      }
}

theadId ,就是创建 ObservableSwitchDownThread 时候传递过来的 Observable.MAIN_THREAD,主线程 。 所以会通过handle.post() 到主线程中。注意,这个 handle 的创建也是有讲究的 Handler mHandler = new Handler(Looper.getMainLooper()); 这样就能保证, post() 的时候,一定在主线程中。因为 ObservableSwithcDownThread 实现了 Runnable 接口,所以 post() 之后,就执行到了 run() 方法中。

@Override
public void run() {
    for (; ; ) {
        Integer poll = queue.poll();
        if (poll == null) {
            break;
        } else {
            int status = poll;
            Log.e(TAG, "run -status-> " + status);
            switch (status) {
                case next: {
                    observer.onNext(t);
                    break;
                }
                case error: {
                    observer.onError(e);
                    break;
                }
                case complete: {
                    observer.onComplete();
                    break;
                }
            }
        }

    }
}

这个 observer 对象,就是我们在 subscribe() 的时候创建的那个 Observer 对象。 run() 中就已经切换到主线程中了,注意,这是一个死循环,队列中有值,就会取出来,然后根据取出来的类型,执行相应的方法。 onNext() ,或者 onError() ,或者 onComplete() 这样就完成了线程从子线程切换到主线程中