扫盲系列 - RxJava 2.0 -- 线程切换原理
没有严格按照RxJava 2.0 源码去分析,而是根据 https://github.com/yds17322/YRxJava 这个去分析的,其实原理都是一样的,只不过这个更简单,更容易理解。
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG, "subscribe --> " + (Looper.myLooper() == Looper.getMainLooper()));
// 调用 ObservableCreate 中 CreateEmitter 的onNext
emitter.onNext("aaa");
emitter.onError(new RuntimeException("123123123"));
emitter.onComplete();
}
})
.switchUpThread(Observable.NEW_THREAD)
.switchDownThread(Observable.MAIN_THREAD)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe() {
Log.i(TAG, "subscribe - onSubscribe");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext --> " + (Looper.myLooper() == Looper.getMainLooper()) + ", s : " + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError --> " + (Looper.myLooper() == Looper.getMainLooper()) + ", e : " + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete --> " + (Looper.myLooper() == Looper.getMainLooper()));
}
});
- Observable
creatObservable = Observable.create(observableOnSubscribe); 以 observableOnSubscribe 为 source , create 一个 Observable 可观察者 此时是 ObservableCreate 对象 - Observable
newThreadObservable = creatObservable.switchUpThread(Observable.NEW_THREAD); 以 ObservableCreate 为 source ,创建一个 ObservableSwitchUpThread 对象 -
Observable
switchDownThread = newThreadObservable.switchDownThread(Observable.MAIN_THREAD); 以 ObservableSwitchUpThread 为 source ,创建 ObservableSwitchDownThread 对象, - switchDownThread.subscribe(observer); subscribe() 是 Observable 的方法,但是里面只调用了 subscribeActual() ,这是一个抽象方法,是需要各个子类实现的, ObservableSwitchDownThread ObservableSwitchUpThread 和 ObservableCreate 都是 extends Observable ,所以我们就知道 switchDownThread.subscribe(observer); 真实的是执行到了 ObservableSwitchDownThread 中的 subscribeActual().
@Override
protected void subscribeActual(Observer<? super T> observer) {
DownThreadObserver threadObserver = new DownThreadObserver(mHandler, threadId , observer);
source.subscribe(threadObserver);
}
observer 就是创建的 Observer 对象,里面包含了onSubscribe()。onNext()。 onError() , onComplete() 方法。
主要步骤:
- 创建了 DownThreadObserver 对象, DownThreadObserver extends Observer 并且 实现了 Runnable 方法,里面包含了之前创建的 observer ,以及线程id (Observable.MAIN_THREAD)
- source 就是 创建 ObservableSwitchUpThread 中被封装的 ObservableSwitchUpThread 对象,同理执行到了 ObservableSwitchUpThread 中的 subscribeActual()
切换到子线程 ObservableSwitchUpThread # subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {//observer 就是 DownThreadObserver 对象
UpThreadObserver upThreadObserver = new UpThreadObserver(observer);
UpThreadTask threadTask = new UpThreadTask(upThreadObserver);
if (threadId == Observable.NEW_THREAD) {
mNewThread = new Thread(threadTask);
mNewThread.start();
} else {
mHandler.post(threadTask);
}
}
- 根据 下游的 DownThreadObserver 对象,创建 UpThreadObserver 对象
- 根据 UpThreadObserver 对象 ,创建 UpThreadTask 对象, UpThreadTask 实现了 Runnable 接口 因为 ObservableSwitchUpThread 创建的时候已经定义了线程类型, Observable.NEW_THREAD ,所以就创建新线程,然后执行 线程 start() 方法,这样就切换到了子线程中。
class UpThreadTask implements Runnable {
private UpThreadObserver upThreadObserver;
public UpThreadTask(UpThreadObserver upThreadObserver) {
this.upThreadObserver = upThreadObserver;
}
@Override
public void run() {
Log.e(TAG, "UpThreadTask - run");
source.subscribe(upThreadObserver);
}
}
- run()方法 在子线程中执行的,那么source.subscribe(upThreadObserver);就是在子线程中执行的。 source 就是创建 ObservableSwitchUpThread 传递过来的 ObservableCreate 对象,于是就执行到了 ObservableCreate 中的 subscribeActual() ,这个时候已经在子线程中执行了。
@Override
protected void subscribeActual(Observer<? super T> observer) {//observer 是下游的 UpThreadObserver 对象
CreateEmitter<T> emitter = new CreateEmitter<T>(observer);
observer.onSubscribe();
try {
mOnSubscribe.subscribe(emitter);
} catch (Exception e) {
e.printStackTrace();
}
}
- 根据 observer 然后创建一个 CreateEmitter , 所以 emitter 中就有一个 UpThreadObserver 对象。
- 执行 mOnSubscribe.subscribe(emitter); 也在子线程中, mOnSubscribe 就是我们开头创建的 Observable.create(observableOnSubscribe) 中的 observableOnSubscribe ,
ObservableOnSubscribe<String> observableOnSubscribe = new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG, "subscribe --> " + (Looper.myLooper() == Looper.getMainLooper()));
// 调用 ObservableCreate 中 CreateEmitter 的onNext
emitter.onNext("aaa");
emitter.onError(new RuntimeException("123123123"));
emitter.onComplete();
}
};
所以就又执行到了 CreateEmitter 中,而 CreateEmitter 只是简单调用了 mObserver 的方法,这个 mObserver 前面说够了,就是 UpThreadObserver ,
@Override
public void onNext(T value) { // 调用 UpThreadObserver 中的onNext
mObserver.onNext(value);
}
@Override
public void onError(Throwable error) {
mObserver.onError(error);
}
@Override
public void onComplete() {
mObserver.onComplete();
}
UpThreadObserver 中的 onNext() ,而 UpThreadObserver 中又是对 DownThreadObserver 的封装,所以就又执行到了 DownThreadObserver 中的onNext()
切回主线程 DownThreadObserver # onNext()
@Override
public void onNext(final T t) {
queue.add(next);
this.t = t;
prepareRun();
}
- 把这个事件类型(next)添加到队列中
- 保存这个事件,这个时候 在子线程中的,
- 执行 PrePareRun() 中,会切换到主线程中。
private synchronized void prepareRun() {
if (threadId == MAIN_THREAD) {
handler.post(this);
} else {
new Thread(this).start();
}
}
theadId ,就是创建 ObservableSwitchDownThread 时候传递过来的 Observable.MAIN_THREAD,主线程 。 所以会通过handle.post() 到主线程中。注意,这个 handle 的创建也是有讲究的 Handler mHandler = new Handler(Looper.getMainLooper()); 这样就能保证, post() 的时候,一定在主线程中。因为 ObservableSwithcDownThread 实现了 Runnable 接口,所以 post() 之后,就执行到了 run() 方法中。
@Override
public void run() {
for (; ; ) {
Integer poll = queue.poll();
if (poll == null) {
break;
} else {
int status = poll;
Log.e(TAG, "run -status-> " + status);
switch (status) {
case next: {
observer.onNext(t);
break;
}
case error: {
observer.onError(e);
break;
}
case complete: {
observer.onComplete();
break;
}
}
}
}
}
这个 observer 对象,就是我们在 subscribe() 的时候创建的那个 Observer 对象。 run() 中就已经切换到主线程中了,注意,这是一个死循环,队列中有值,就会取出来,然后根据取出来的类型,执行相应的方法。 onNext() ,或者 onError() ,或者 onComplete() 这样就完成了线程从子线程切换到主线程中
既已览卷至此,何不品评一二: