源码分析 - RxJava 2.0 -- 线程切换
在使用 RxJava 的过程中,我们经常会使用到其线程切换的功能,而线程切换的功能主要通过 subscribeOn() 和 observeOn() 两个方法实现。为啥一行代码就能切换线程呢?这中间到底做了啥呢?带着这个疑问,我们具体看看里面实现逻辑。
下面是一段使用 RxJava 进行线程切换的样例代码:
主线程
样例
Observable.just("Hello, world !")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
@Override
public void onNext(@NonNull String s) {}
@Override
public void onError(@NonNull Throwable e) {}
@Override
public void onComplete() {}
});
先看一下 just() 方法的源码:
Observable # just()
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
just() 方法会返回一个 ObservableJust 对象,再分别看一下 subscribeOn() 和 observeOn() 的源码:
Observable # subscribeOn() observeOn()
public final Observable<T> subscribeOn(Scheduler scheduler){
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false , bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError , int bufferSize) {
......
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler , delayError , bufferSize));
}
从源码可以看出, subscribeOn() 方法最终会返回一个 ObservableSubscribeOn 对象,而 observeOn() 方法最终会返回一个 ObservableObserveOn 对象,
上述的ObservableJust、ObservableSubscribeOn、ObservableObserveOn都是 Observable 类型的对象。
由于调用 subscribeOn() 和 observeOn() 构造相应的 Observable 对象时,都会把 this 传入构造方法中保存起来,所以最后会存在如下的引用关系:
ObservableObserveOn -> ObservableSubscribeOn -> ObservableJust
Observable # subscribe()
样例中最终调用的 subscribe() 方法实际上是 ObservableObserveOn 对象的 subscreibe() 方法,而上述的三个 Observalbe 对象的 subscribe() 的代码都是相同的,都继承自 Observable 类的 subscribe() 方法:
//Observable.java
public final void subscribe(Observer<? super T> observer) {
...
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
...
}
subscribe() 方法会调用 subscribeActual() 方法,上述的三个 Observable 对象的差异主要就集中在 subscribeActual() 方法中, ObservableObserveOn 的 subscribeActual() 方法源码如下:
ObservableObserveOn # subscribeActual()
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler , boolean delayError , int bufferSize) {
super(source); // ObservableSubscribeOn 对象
this.scheduler = scheduler;// HandlerScheduler
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//scheduler 是 HandlerScheduler ,所以会执行到这里
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w , delayError , bufferSize));
}
}
}
- 先构造一个 Worker 对象,
- 构造一个 ObserveOnObserver 对象, ObserveOnObserver 会引用 subscribe() 方法传入的 Observer 对象和 Worker 对象;
- 调用 source 成员变量的 subscribe() 方法,这里的 source 实际上是 ObservableSubscribeOn 对象
于是我们就进入 ObservableSubscribeOn 的 subscribeActual() 方法:
ObservableSubscribeOn # subscribeActual()
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source); //ObservableJust 对象
this.scheduler = scheduler; // IOScheduler对象
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
......
}
- 构造了一个 SubscribeOnObserver 对象,这个对象会引用 subscribeActual() 方法传入的 Observer 对象即 ObserveOnObserver ,这里也形成了一条引用链:
SubscribeOnObserver -> ObserveOnObserver -> 样例中通过 subscribe() 方法传入的Observer
- SubscribeOnObserver会被用于构造 SubscribeTask 对象, SubscribeTask 对象又会用于调用 Scheduler 的 scheduleDirect() 方法,先看一下 SubscribeTask 的代码:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
可见 SubscribeTask 是个 Runnalbe 类型对象,它的 run() 方法中调用了 source 的 subscribe() 方法,这里的 source 是ObservableJust;
回到前面对Scheduler(在这里实际是 IOScheduler ,因为 Schedulers.io() 就是一个IOScheduler)的 scheduleDirect() 方法:
IOScheduler # scheduleDirect()
//IOScheduler 父类 Scheduler 的代码:
public Disposable scheduleDirect(@NonNull Runnable run) { //run 就是 SubscribeTask 对象
return scheduleDirect(run, 0L , TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay , @NonNull TimeUnit unit) {
final Worker w = createWorker(); // w 就是 EventLoopWorker 类型
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);// task 中就包含这个 SubscribeTask 对象
w.schedule(task, delay , unit);
return task;
}
- scheduleDirect()方法用 createWorker() 构造一个 Worker 对象,这里实际上是 EventLoopWorker 对象,
- 调用 EventLoopWorker 的 schedule() 方法,传递的参数就是封装了 SubscribeTask 对象的 DisposeTask 类, DisposeTask 本身也是一个 Runnable 对象
EventLoopWorker # schedule()
//IOScheduler.java
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
......
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime , @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime , unit , tasks);
}
}
- scheduleDirect()方法用 createWorker() 构造一个 Worker 对象,这里实际上是 EventLoopWorker 对象,
- 然后会调用 EventLoopWorker 的 schedule() 方法,传递的参数就是封装了 SubscribeTask 对象的 DisposeTask 类, DisposeTask 本身也是一个 Runnable 对象
static final class DisposeTask implements Disposable , Runnable , SchedulerRunnableIntrospection { DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) { this.decoratedRun = decoratedRun;//decoratedRun 就是我们传递过来的 SubscribeTask 对象 this.w = w; } @Override public void run() { ... decoratedRun.run(); ... } }
- schedule()方法会调用 ThreadWorker 的 scheduleActual() 方法, ThreadWorker 继承于 NewThreadWorker ,这里调用的 scheduleActual() 也是来自于NewThreadWorker:
NewThreadWorker # scheduleActual()
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime , @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
......
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime , unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
......
}
return sr;
}
}
- 创建 NewThreadWorker 时候就会创建一个 executor ,而 executor 就是一个线程池。并且是 ScheduledExecutorService 类型的。关于线程池的相关知识,可以查看 Android 线程池
//SchedulerPoolFactory.java public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; }
这里会向线程池提交一个任务,executor.submit(sr),而这个 sr 就是封装了 DisposeTask 的 ScheduledRunnable 对象。 ScheduledRunnable 本身也是一个 Runnable 对象。
这样就把后续的逻辑切换到线程池的线程中执行。
切到子线程
屡一下啊。有点懵。
来个调用链:
ObservableObserveOn.subscribeActual(样例中通过 subscribe() 方法传入的Observer)
-> ObservableSubscribeOn.subscribeActual(ObserveOnObserver)
-> IOScheduler.scheduleDirect(SubscribeTask)
-> EventLoopWorker.schedule(DisposeTask)
-> NewThreadWorker.scheduleActual(DisposeTask)
-> ScheduledExecutorService.schedule(ScheduledRunnable)
-> 切换到子线程中
然后呢?
SubscribeTask , DisposeTask , ScheduledRunnable 都是一个 Runnable 对象。
ScheduledRunnable 中有一个 DisposeTask 对象。 DisposeTask 中有一个 SubscribeTask 对象。 俄罗斯套娃啊。
那么接下来的流程就是 run() 中执行 run() 呗
ScheduledRunnable.run()-> DisposeTask.run()-> SubscribeTask.run()
所以 最终会执行到前面的 SubscribeTask 的 run() 方法,再贴一下前面 SubscribeTask 的代码:
SubscribeTask # run()
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);//子线程中
}
}
source.subscribe(parent) 就肯定是在子线程中执行的。 source 就是 ObservableJust 对象,同样的会执行到 ObservableJust 的 subscribeActual() 方法:
ObservableJust # subscribeActual()
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);//子线程中
sd.run();//子线程中
}
......
}
这里构造了一个 ScalarDisposable 对象,后面会调用它的 run() 方法:
ScalarDisposable # run()
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
@Override
public void run() {
observer.onNext(value);//子线程中
}
}
}
这里会调用 observer 的 onNext() 方法,而 observer 实际上是SubscribeOnObserver:
SubscribeOnObserver # onNext()
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
...
@Override
public void onNext(T t) {
//子线程
actual.onNext(t);//actual 就是 ObserveOnObserver
}
.....
}
调用了 actual.onNext(t),而 actual 就是 ObserveOnObserver
调用链如下:
ScheduledRunnable.run()
-> DisposeTask.run()
-> SubscribeTask.run()
-> ObservableJust.subscribeActual(SubscribeOnObserver)
-> ScalarDisposable.run()
-> SubscribeOnObserver.onNext()
-> ObserveOnObserver.onNext()
目前这些都是在子线程中执行的。
ObserveOnObserver # onNext()
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
......
@Override
public void onNext(T t) {
...
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);//加入queue
}
schedule();
}
......
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);//传入 ObserveOnObserver 对象本身
}
}
}
- onNext()方法会把 value 加入 queue 成员变量,然后执行 schedule() 方法,
- schedule()方法中又会执行 worker 的 schedule() 方法,这里的 worker 实际上是 HandlerWorker
AndroidSchedulers.mainThread() = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
final class HandlerScheduler extends Scheduler {
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
}
- new Handler(Looper.getMainLooper() 肯定是在主线程的执行的Handler
- ObserveOnObserver 中的 worker 就是HandlerWorker
调用 worker 的 schedule() 方法时会传入 ObserveOnObserver 对象本身,这里需要注意的是 ObserveOnObserver 是实现了 Runnable 接口的.
HandlerWorker # schedule()
private static final class HandlerWorker extends Worker {
@Override
public Disposable schedule(Runnable run, long delay , TimeUnit unit) {
......
// run 就是 ObserveOnObserver 对象
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
......
return scheduled;
}
......
}
- 会把ObserveOnObserver(就是 run 参数)封装进 ScheduledRunnable 中, ScheduledRunnable 也实现了 Runnable 接口
- 再把 ScheduledRunnable 赋值给 Message 的 obj 成员,
- Message被用来作为参数调用 Handler 的 sendMessageDelayed() , 由于这个 Handler 对应的 Looper 是属于主线程的,这样 ScheduledRunnable 的 run() 方法便会在主线程执行
切回主线程
ScheduledRunnable # run()
private static final class ScheduledRunnable implements Runnable , Disposable {
private final Handler handler;
private final Runnable delegate;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
delegate.run();// delegate就是ObserveOnObserver
}
......
}
ObserveOnObserver # run()
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
@Override
public void run() {
...
drainNormal();
}
}
void drainNormal() {
int missed = 1;
//这个是在 ObservableJust 中传入的
final SimpleQueue<T> q = queue;
//这个 actual 是最外边的 Observer ,即样例代码中
//调用 subscribe() 传入的Observer
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
//第一次循环中调用 checkTerminated() 返回 false ,
//第二次循环中调用 checkTerminated() 返回true
if (checkTerminated(d, empty , a)) {
return;
}
if (empty) {
break;
}
//第一次循环中会调用这里
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
boolean checkTerminated(boolean d, boolean empty , Observer<? super T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
......
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else if (empty) {
//第一次循环中调用 checkTerminated() 不会进入这里,
//第二次循环中调用 checkTerminated() 会进入这里。
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}
- 由于前面在 onNext() 方法里往 queue 里加入了一个元素,所以第一次进入循环的时候 queue 是不为空的, checkTerminated() 会返回 false ,
- 之后会执行到a.next()这条语句,这里的 a 是样例中通过 subscribe() 方法传入的 Observer ,即这里会调用传入 Observer 的 oNext() 方法,
- 然后进入第二次循环,由于这时 queue 为空,所以再进入 checkTerminated() 方法时,会调用 Observer 的 onComplete() 方法,
整个调用链如下:
ObservableObserveOn.subscribeActual(样例中通过 subscribe() 方法传入的Observer)
-> ObservableSubscribeOn.subscribeActual(ObserveOnObserver)
-> IOScheduler.scheduleDirect(SubscribeTask)
-> EventLoopWorker.schedule(DisposeTask)
-> NewThreadWorker.scheduleActual(DisposeTask)
-> ScheduledExecutorService.schedule(ScheduledRunnable)
-> 切换到子线程中
-> ScheduledRunnable.run()
-> DisposeTask.run()
-> SubscribeTask.run()
-> ObservableJust.subscribeActual(SubscribeOnObserver)
-> ScalarDisposable.run()
-> SubscribeOnObserver.onNext()
-> ObserveOnObserver.onNext()
-> HandlerWorker.schedule()
-> 发送 Handler 消息,切换到主线程
-> ScheduledRunnable.run()
-> ObserveOnObserver.run()
-> ObserveOnObserver.drainNormal()
-> 样例中通过 subscribe() 方法传入的Observer.onNext()
我觉得这里面最关键的是 Runnable 接口:
- 通过线程池 ScheduledExecutorService schedule 一个 Runnable 对象 切到子线程
- 通过主线程 Handler post 一个 Runnable 对象 切换到主线程
搬运地址:
既已览卷至此,何不品评一二: