扫盲系列 - RxJava 基本用法
RxJava
异步,实现异步操作的库。异步实现,是通过一种扩展的观察者模式来实现。
好处:简洁,随着程序逻辑越来越复杂,它依旧能保持简洁,简洁到无论多么什么复杂的逻辑都能串成一条链
三个基本概念
- Observable 可观测者,即被观察者
- Observer 观察者
- subcribe 订阅事件
Observer 观察者
决定事件触发的时候将有怎么样的行为,是一个泛型接口,实现方式
Observer<String> observable=new Observer<String>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer 。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError() 。
- onCompleted(): 事件队列完结。 RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。 RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
- onError(): 事件队列异常。在事件处理过程中出异常时, onError() 会被触发,同时队列自动终止,不允许再有事件发出。
- 在一个正在运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是, onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
Subscriber 是 Observer 的抽象类,对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
protected Subscriber() {
this(null, false);
}
protected Subscriber(Subscriber<?> subscriber) {
this(subscriber, true);
}
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}
public final void add(Subscription s) {
subscriptions.add(s);
}
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}
public void onStart() {
// do nothing by default
}
}
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
从源码可知, Subscriber 好像没干什么事情,只是简单的桥接了一下,创建了一个 SubscriptionList 对象,剩下的要么是把工作交给这个 SubscriptionList 对象 subscriptions 。例如 unsubscribe() , isUnsubscribed() 以及add(Subscription s)方法,要么就是空实现。比如 onStart() ,要么就是没实现。例如 Observer 中的三个方法。
实质上,在 RxJava 的 subscribe 过程中, Observer 也总是会先被转换成一个 Subscriber 再使用。主要区别有以下两点:
- onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法, 具体可以在后面的文中看到。
- unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后, Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显
Observable
使用 Rxjava 的 create() 方法来创建一个 Observable 对象,并为他定义事件的触发规则
//当 Observable 被订阅的时候, OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发.
//由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
Observable<String> observable=Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//观察者 Subscriber 将会被调用三次 onNext() 和一次 onCompleted()
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("RxJava");
subscriber.onCompleted();
}
});
create() 是 RxJava 最基本的创建事件序列的方法,基于这个方法, RxJava 还创建了一系列方法用来快速创建事件序列
接口 Action
// 方法一
Observable observable = Observable.just("Hello", "Hi", "Aloha");
//方法二
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
上面两种方式和 create() 方式效果一样的,都是依次执行onNext(“Hello”),onNext(“Hi”),onNext(“RxJava”)以及onComleted()
原因看源码
public class Observable<T> {
...
// create()方式
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
// from()方式
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
//public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
return create(new OnSubscribeFromArray<T>(array));
}
// just()方式,
public static <T> Observable<T> just(T t1, T t2 , T t3) {
return from((T[])new Object[] { t1, t2 , t3 });
}
其实就是 just() 方式,把参数集合封装成一个数组,然后执行 from() 方法,然后 from() 方法又把这个数组转换成 OnSubscribeFromArray 对象,也就是 OnSubcribe 对象,执行 create() 方法
subscribe 订阅
创建了 Observer 和 Observable 之后,再用 subscribe() 方法,把他们连接起来,整条链子就可以工作了,代码形式很简单
observable.subscribe(observer);
//或者
observable.subscribe(sub);
源码能告诉我们一切
// Observable<T>
···
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new ObserverSubscriber<T>(observer));
}
当参数是 Observer 的时候,会判断是不是 Subscribe 对象,
- 是的话就直接强转,然后执行subscribe(Subscriber<? super T> subscriber)方法。
- 如果不是的话,就把它封装成一个 Subscribe 的子类对象,然后执行subcribe(Subscriber<? super T> subscriber)方法,
也就是无论如何,Observer 也总是会先被转换成一个 Subscriber 再使用。
接下来咱们看看 subscribe(Subscriber<? super T> subscriber) 方法是如何实现的
// Observable<T>
static final RxJavaObservableExecutionHook hook =
RxJavaPlugins.getInstance().getObservableExecutionHook();
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber
,Observable<T> observable) {
···
subscriber.onStart();
···
try{
//参照下面的源码可知,hook.onSubscribeStart(observable, observable.onSubscribe)
//得到的结果就是observable.onSubscribe
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
//参照下面的源码可知,得到的结果就是 subscriber 本身
return hook.onSubscribeReturn(subscriber);
}catch (Throwable e) {
···
subscriber.onError(hook.onSubscribeError(e));
···
}
}
//RxJavaObservableExecutionHook.Java
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance
, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}
public <T> Subscription onSubscribeReturn(Subscription subscription) {
// pass through by default
return subscription;
}
根据源码可知:subscriber()做了 3 件事
- 执行了 Subscribe 的 onStart() 方法,这是一个可选的准备方法
- 因为 hook.onSubscribeStart(observable, observable.onSubscribe)返回的是observable.onSubscribe对象,也就是说执行了 Observable 中 onSubscibe 的 onCall() 方法,从这里,事件发送的逻辑开始执行,从这里看出,在 RxJava 中, Observable 并不是在一开始创建的时候就开始发送事件,而是在执行 subscribe() 方法执行的时候。
- 根据RxJavaObservableExecutionHook.java 源码可知,hook.onSubscribeReturn(subscriber)得到的结果还是 subscriber 本身,也就是将 Subscriber 对象作为 Subscription 返回,这是为了方便unsubscrible()
subscribe()不完整定义
除了 subscribe(Observer) 和 subscribe(Subscriber) , subscribe() 还支持不完整定义的回调, RxJava 会自动根据定义创建出 Subscriber 。
Action1<String> onNextAction=new Action1<String>() {
@Override
public void call(String s) {
}
};
Action1<Throwable > onErrorAction=new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
};
Action0 onCompletedAction=new Action0() {
@Override
public void call() {
}
};
//自动创建 Subscriber ,并且使用 onNextAction 来定义 onNext() 方法
observable.subscribe(onNextAction);
//自动创建 Subscriber ,并且使用 onNextAction 来定义 onNext() 方法
//用 onErrorAction 来定义 onError() 方法
observable.subscribe(onNextAction,onErrorAction);
//自动创建 Subscriber ,并且使用 onNextAction 来定义 onNext() 方法用 onErrorAction 来定义 onError() 方法,
//使用 onCompleteAction 义 onComplete() 方法
observable.subscribe(onNextAction, onErrorAction ,onCompletedAction);
接口Action
解释 Action 之前,需要了解一个 Action 是个啥玩意儿,二话不说,源码走起
public interface Action extends Function {/**空接口*/}
/**
* All Func and Action interfaces extend from this.
* Marker interface to allow instanceof checks.
*/
public interface Function {/**空接口*/}
Actoin 是一个接口,是一个空接口,继承 Function 这个空接口,定义一个空接口 Function ,然后在使用另外一个空接口(Action)继承,写 RxJava 的人有毛病吧,其实人家 在 Function 上已经解释了 。
All Func and Action interfaces extend from this,Marker interface to allow instanceof checks.
翻译过来就是说你们所有的 Func 和 Action 来自 Function ,为了就是允许 instanceof 检查的标记接口。其实定义 Function 就是为了 instanceof 检查
接下来看看 Action0 , Action1 的定义
public interface Action0 extends Action {
void call();
}
public interface Action1<T> extends Action {
void call(T t);
}
看出来点眉目了吗,没有咱们就再来几个看看
public interface Action2<T1, T2> extends Action {
void call(T1 t1, T2 t2);
}
public interface Action3<T1, T2 , T3> extends Action {
void call(T1 t1, T2 t2 , T3 t3);
}
public interface ActionN extends Action {
void call(Object... args);
}
这应该发现了点规律吧,
- 都是继承 Action 接口的,
- Action几后面有几个泛型参数,例如 Action1 后面有一个泛型参数 T , Action2 后面有两个泛型参数<T1,T2>, ActionN 后面虽然没有泛型参数,但是 call 方法里面有一个可变参数
- 关于 call() 方法里面的参数, Actoin 后面有几个泛型参数, call() 方法里面就有几个参数,
因为 onNext() 和 onError() 方法中有一个参数,所以使用了 Actoin1 定义,而 onComplete() 属于无参数方法,所以使用了 Action0 定义,其实一看源码就知道了怎么回事了
// Observable.java
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError , onCompleted));
}
public final Subscription subscribe(final Action1<? super T> onNext
, final Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError , onCompleted));
}
public final Subscription subscribe(final Action1<? super T> onNext
, final Action1<Throwable> onError
, final Action0 onCompleted) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onCompleted == null) {
throw new IllegalArgumentException("onComplete can not be null");
}
return subscribe(new ActionSubscriber<T>(onNext, onError , onCompleted));
}
不管你传递几个 Action 系列,少于三个的话,会把默认的 Action 传递进去,
- 默认的 onErrorAction 是InternalObservableUtils.ERROR_NOT_IMPLEMENTED,
- 默认的 onCompleteAction 是Actions.empty()
最终都是把三个 Action 封装成一个 ActionSubscribe 类,然后调用了Subscription subscribe(Subscriber<? super T> subscriber)方法。
这三个是有顺序的,如果传递一个 Action ,那么就必须是 onNextAction ,也就是执行 onNext() 方法,如果传递的是两个 Action 的话,就必须是 onNextAction ,和 onErrorAction ,不能是 onNextAction 和 onCompleteAction ,更不能是 onErrorAction 和 onCompleteAction ,原因参照源码
public final class ActionSubscriber<T> extends Subscriber<T> {
final Action1<? super T> onNext;
final Action1<Throwable> onError;
final Action0 onCompleted;
public ActionSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted) {
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
@Override
public void onNext(T t) {
onNext.call(t);
}
@Override
public void onError(Throwable e) {
onError.call(e);
}
@Override
public void onCompleted() {
onCompleted.call();
}
}
因为第一个参数就是 onNext 接收的,执行的 onNext() 方法,第二个参数是 onError 接受的,在 onError() 方法中执行了,第三个同理
在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler
。
Scheduler 线程控制 一
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,在那个线程中调用了 subscribe() ,在哪个线程中生产事件,也就在哪个线程中消费事件,如果要切换线程,就需要使用到 Schedule ,调度器
Scheduler 的API
在 RxJava 中,Scheduler(调度器)相当于线程控制器, RxJava 通过 Scheduler 指定每一段代码在那个线程中执行, Rxjava 内置了几个 Schedule ,他们已经适应于大多数情况了
- Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程,默认的Schedule
- Schedulers.newThread(): 总是启用新线程,并且在新线程中执行操作
- Schedulers.computation(): 计算所使用的 Scheduler 。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU 。
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler 。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- AndroidSchedulers.mainThread(): Android 专有它指定的操作将在 Android 主线程运行
有了这几个 Scheduler ,就可以使用 subscribon() 和 observeon() 进行调用了
- subscribeon() 指定 subscribe() 所发生的线程,也就是Observe.OnSubscribe激活所在的线程,或者叫事件产生的线程
- observeron() 指定 Subscribe 所运行的线程,或者叫事件消耗的线程
例如
Observable.just(1, 2 , 3 ,4)//
//指定 subcribe() 发生的线程是在 io 线程,即被创建的事件的内容 1、2、3、4 将会在 IO 线程发出
.subscribeOn(Schedulers.io())
//指定 Subscribe 的回调线程是在 Android 主 UI 线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {//这个方法将会在 Android 主线程中执行
}
});
这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。
在如后台加载图片, UI 线程显示的
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
imageView= (ImageView) findViewById(R.id.btn);
imgResId=R.mipmap.ic_launcher;
Log.d("hoyouly", getClass().getSimpleName() + " -> onCreate: "+Thread.currentThread());
Observable.create(new Observable.OnSubscribe<Drawable>() {
@TargetApi(Build.VERSION_CODES.LOLLIPOP)
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Log.d("hoyouly", getClass().getSimpleName() + " -> call: "+Thread.currentThread());
Drawable drawable = getTheme().getDrawable(imgResId);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})//
.subscribeOn(Schedulers.io())//指定 subscribe() 即 加载图片过程 是在 io 线程中
.observeOn(AndroidSchedulers.mainThread())//指定 Subscribe 回调是在 Android UI 线程中,即设置图片则被设定在了主线程
.subscribe(new Observer<Drawable>() {
@Override
public void onCompleted() {
Log.d("hoyouly", getClass().getSimpleName() + " -> onCompleted: "+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Toast.makeText(getApplicationContext(), "加载图片失败", Toast.LENGTH_LONG).show();
}
@Override
public void onNext(Drawable drawable) {
Log.d("hoyouly", getClass().getSimpleName() + " -> onNext: "+Thread.currentThread());
imageView.setImageDrawable(drawable);
}
});
}
输出的结果
RxjavaActivity -> onCreate: Thread[main, 5 ,main]
-> call: Thread[RxCachedThreadScheduler-1, 5 ,main]
-> onNext: Thread[main, 5 ,main]
-> onCompleted: Thread[main, 5 ,main]
- onNext()和 onCompleted() 是和 onCreate() 方法在同一个线程中的,即 UI 线程中,
- call()方法是在RxCachedThreadScheduler-1这个线程中的,也就是所谓的 加载图片将会发生在 IO 线程,切换到子线程
- 设置图片则被设定在了主线程,切回主线程
Schedule的原理
Schedule 竟然这么神奇, subscribe() 方法在最外层直接调用的方法,竟然能指定线程,神奇的不要不要的好不好,这究竟是为啥呢,其实原理是以变换为基础的,变换,就是将事件的对象或者整个序列进行加工处理,转换成不同的事件或事件序列
Observable.just("images/logo.png")//输入类型是String
.map(new Func1<String, Bitmap>() { //Func1 两个泛型变量, String ,Bitmap
@Override
public Bitmap call(String s) {// 参数类型是 String ,返回类型是Bitmap
return null;
}
})//
.subscribe(new Action1<Bitmap>() {//一个参数类型,输入类型是Bitmap
@Override
public void call(Bitmap bitmap) {//参数类型是 Bitmap ,返回类型是void
}
});
Func1
public interface Func1<T, R> extends Function {
R call(T t);
}
查看源码可知,
- Func1 也是继承 Function ,和 Action 是同一个老子,其实它和 Action 类似,
- 不同的是, Action 包含的 call() 方法是没有返回值的,而 Func1 包含的 call() 方法是有返回值的。返回值就是你泛型里面设置的。
解释了 Func1 这个小插曲,接下来进入正题,转换 map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap 。这种直接变换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。??????
举例说明
- 如果打印一组学生的名字.
final Student [] students=new Student[]{};
final Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {
//输出来的就是学生的名字
Log.d("hoyouly", getClass().getSimpleName() + " -> onNext: "+s);
}
};
Observable.from(students)//
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})//
.subscribe(subscriber);
使用 map() 方法进行转换一下就行了,可是如果我想要打印学生的课程呢?我们知道,学生和名字是一一对应的,可是学生与课程的关系就是一对多啊,当然,我们可以通过 for 循环来处理。
final Student [] students=new Student[]{};
Subscriber<Student> studentSubscriber=new Subscriber<Student>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Student student) {
//得到的是学生信息,然后通过 for 循环打印课程
for (Courses courses:student.getCourses()) {
Log.d("hoyouly", getClass().getSimpleName() + " -> onNext: "+courses.toString());
}
}
};
Observable.from(students).subscribe(studentSubscriber);
可是我不想使用 for 循环呢,这不符合 RxJava 的流式 API 了啊,那有什么办法呢,就要祭出神器了,flatmap()
flatmap()
Subscriber<Courses> coursesSubscriber=new Subscriber<Courses>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Courses courses) {
//得到的就是每一个课程的名字
Log.d("hoyouly", getClass().getSimpleName() + " -> onNext: "+courses.toString());
}
};
Observable.from(students)//
.flatMap(new Func1<Student, Observable<Courses>>() {
@Override
public Observable<Courses> call(Student student) {
return Observable.from(student.getCourses());
}
})//
.subscribe(coursesSubscriber);
flatMap() 与map()
- 相同点,就是把传入的参数转换成后返回另外一个对象
- 不同点:map 返回的是一个普通对象,而 flatMap() 返回的是一个 Observable 对象,并且这个对象并没有直接发送给 Subscribe 的调用(即onNext())方法中,
flatMap() 原理:
- 使用传入的对象创建一个 Observable 对象
- 不发送这个 Observable 对象,而是激活它,于是它开始发送事件
- 每一个创建的 Observable 发送的事件,都被汇入到同一个 Observable 中,而这个 Observable 将这些事件统一交给 Subscribe 的回调方法。
这三步骤,通过一组新创建的 Observable 对象将初始化的对象平铺之后,然后通过统一的路径分发下去,这就是 flatMap() 。 flat 就是平铺,平面的意思
扩展
Retorfit 和 RxJava 配合
由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求
networkClient.token() // 返回 Observable<String>,在订阅时请求 token ,并在响应后发送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 处理显示消息列表
showMessages(messages);
}
});
变换的原理 lift()
变换的功能虽然不同,但是本质上都是针对事件序列的处理再发送。在 RxJava 中,基于同一个基础变换方法,即lift()老样子,从源码开始吧,我们从上一次打印一组学生的例子开始看起
final Student [] students=new Student[]{};
final Subscriber<String> subscriber = new Subscriber<String>() {
...
@Override
public void onNext(String s) {
//输出来的就是学生的名字
}
};
Func1<Student, String> func1 = new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
};
Observable.from(students)//
.map(func1)//
.subscribe(subscriber);
我修改了一下结构把 map() 里面的参数提取出来,成为一个变量,摈弃删除了一些暂时无关紧要的代码(onError()和onCompleted()),这样看起来我感觉能更清晰一些
然后看看 map() 里面的实现
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
// 因为我们指定泛型类型是 Student 和 String ,所以就直接换了吧,这样看起来舒服
public final Observable<String> map(Func1<Student, String> func) {
return lift(new OperatorMap<Student, String>(func));
}
先看看这个 OperarMap 是什么玩意吧
//同样。里面的泛型也是被实际替换了,
//注意 泛型类型调换了,参照源码 public final class OperatorMap<T, R> implements Operator<R, T> {},
public final class OperatorMap<Student, String> implements Operator<String,Student> {
private final Func1<Student, String> transformer;
public OperatorMap(Func1<Student, String> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<Student> call(final Subscriber<String> o) {
return new Subscriber<Student>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(Student t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this , t);
}
}
};
}
}
public interface Operator< String,Student>
extends Func1<Subscriber<String>, Subscriber<Student>> {
// cover for generics insanity
}
根据源码可知,其实这个 OperatorMap 本质上是一个 Func1 ,只不过被包装了一下,
public final class OperatorMap<Student, String>
implements Func1<Subscriber<String>, Subscriber<Student>>{
public OperatorMap(Func1<Student, String> transformer) {
this.transformer = transformer;
}
@Override // Func1 意义就是有两个泛型, call 的参数是Subscriber<String>,返回值是Subscriber<Student>
public Subscriber<Student> call(final Subscriber<String> o) {
return new Subscriber<Student>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(Student t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this , t);
}
}
};
}
}
然后我们看 lift() 源码,
public final Observable<Student> lift(final Operator<Student, String> operator) {
return new Observable<Student>(new OnSubscribe<Student>() {
@Override
public void call(Subscriber<Student> o) {
try {
Subscriber<String> st = hook.onLift(operator).call(o);
try {
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
});
}
这段代码有点意思, lift() 竟然返回的是生成一个新的 Observable 对象并且将它返回,而且创建 Observable 所用的参数 OnSubscribe 的回调方法 call() 看起来竟然和之前的Observable.subcribe()那么相似,其实并不真的的一样,失之毫厘谬以千里,onSubscribe.call(st)中 onSubscibe 指的对象并不同, 接下来开始绕口令:
- subscribe()中的 onSubcribe 指的是 Observable 中的 onSubscribe 对象,这个没问题,但是 lift() 之后,的情况就复杂了
- 当含有 lift() 时:
1.lift()创建一个新的 Observable 后,加上原来的 Observable ,代码就有了两个 Observable 对象了
- 同样的,新的 Observable 中有新的 OnSubscribe 对象即 Subscriber
o ,再加上原来的 Observable 中的 OnSubscribe 对象,就有两个 OnSubcribe 对象了 - 当用户调用经过 lift() 后的 Observable 的subscribe()方法时候,使用的是通过 lift() 创建并返回的 新的 Observable 对象,于是它触发的onSubscribe.call(subscribe),也是用的新的 Observable 中的新的 OnSubscribe 对象,即在 lift() 中生成的那个 OnSubscribe 对象 o ,
- 而这个新的 OnSubscribed 对象 o 的 call() 方法中的 OnSubscribe ,就是指的原始 Observable 中的 OnSubscribe ,在这个 call() 中,新的 OnSubscribe 利用operator.call(subscriber)生成新的Subscribe(Operator就是在这里,通过自己的 call() 方法,将新的 Subscribe 和原始的 Subscribe 进行关联,并插入自己的变换代码实现变换) 然后利用新的 Subscribe 像原始的 Observable 进行订阅
- 同样的,新的 Observable 中有新的 OnSubscribe 对象即 Subscriber
这样就实现了 lift() 过程,有点像代理机制,通过事件拦截和处理实现事件序列变换。
我们知道, RxJava 的实现,是一条从上到下的链式调用方式,所谓链式调用,说白了就是先创建该对象,然后调用该对象的其他方法,可是返回的结果却还是该对象,就这样一直可以调用下去,像一条链子一样,这是我们通常的实现方式,也是所谓的 Builder 模式, Android 最常见的应该就是这个 AlertDialog 的使用
AlertDialog alertDialog = new AlertDialog.Builder(this)
.setTitle(title)
.setMessage(msg)
.setCancelable(false)
.setPositiveButton(btn,
new DialogInterface.OnClickListener() {
@Override
public void onClick(final DialogInterface dialog, int which) {
dialog.dismiss();
}
}).show();
查看 AlterDialog 源码就可知实现原理:
// AlterDialog.java
public Builder setTitle(int titleId) {
P.mTitle = P.mContext.getText(titleId);
return this;
}
public Builder setTitle(CharSequence title) {
P.mTitle = title;
return this;
}
public Builder setCustomTitle(View customTitleView) {
P.mCustomTitleView = customTitleView;
return this;
}
public Builder setMessage(int messageId) {
P.mMessage = P.mContext.getText(messageId);
return this;
}
public Builder setMessage(CharSequence message) {
P.mMessage = message;
return this;
}
public Builder setIcon(int iconId) {
P.mIconId = iconId;
return this;
}
这只是部分源码,但是已经能看出来门道了,返回的都是 this ,就是该对象,虽然 RxJava 也是使用的链式调用,也是使用的 Builder 模式,可是用的却不一样,它的返回并不是一个 this ,尤其是 lift() 方法,我们知道 lift() 虽然返回的还是一个 Observable 对象,看着符合链是调用,但是人家是 new 了一个 Observable 对象,所以尽管Observable.create()已经创建了一个 Observable 对象,并且附带的创建了一个 OnSubscribe 对象,但是到 lift() 这,它竟然 偷梁换柱,重新创建了一个新的 Observable 对象和相应的 OnSubscribe 对象并返回,从而让后面的方法使用。所以后面执行的 subscribe(subscriber),其实是执行的 lift() 创建并返回的 Observable 对象,
compose()
对 Observable 整体变换
除了 lift() 之外, Rxjava 还有一个变换方法叫 compose() ,
他和 lift() 区别在于, lift() 针对事件项和事件序列的,而 compose() 是真的 Observable 本身进行转换的。 还是举例说明吧 假如在程序中 有多个 Observable ,并且他们都需要使用一组相同的 lift() 转换,你可以这么写:
observable1
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
observable2
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber2);
observable3
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber3);
observable4
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
可是这样写是不是太麻烦了啊,如果要是有 100 个呢,还这样写,这不像是面向对象的开发啊,那有没有方式解决呢,答案就是compose()
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
这样是不是就感觉好看多了呢,可是原理是啥呢,依旧老样子,看源码咯。
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
return ((Transformer<T, R>) transformer).call(this);
}
我们定义了一个类 LiftAllTransformer 实现了Observable.Transformer<Integer, String> 这个接口,然后呢,执行 compose() ,里面执行的就是传递过来的 Transformer 的 call() 方法,其实就是执行了我们定义的 LiftAllTransformer 里面的 call() 方法,感觉有点像是把 lift() 方法进行了二次封装,封装成一个 call 方法执行。
Schedule 线程控制二
除了灵活变换, Rxjava 另一个 NB 的地方就是线程的自由控制。
利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap() 等变换方法后,就可以多次切换线程。
因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不一定是 subscribe() 中参数的 Subscribe ,而是 observeOn() 执行时当前 Observable 所对应的 Subscriber ,即他的直接下级 Subscriber 。
换句话说observeOn指定的是他之后操作所在的线程,因此如果想要多次切换线程,只需要在每次切换线程的时候调用 observerOn() 即可
Observable.just(1, 2 , 3 , 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(new Func1<Integer, String>() {// 新线程,由 observeOn() 指定
@Override
public String call(Integer integer) {
Log.e("hoyouly", "call: integer: "+integer+" current thread :"+Thread.currentThread());
return String.valueOf(integer);
}
})
.observeOn(Schedulers.io())
.map(new Func1<String, Integer>() {// IO 线程,由 observeOn() 指定
@Override
public Integer call(String s) {
Log.e("hoyouly", "call: s: "+s+" current thread :"+Thread.currentThread());
return Integer.parseInt(s);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {// Android 主线程,由 observeOn() 指定
@Override
public void onCompleted() {
Log.e("hoyouly", "onCompleted: "+Thread.currentThread());
}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Integer integer) {
Log.e("hoyouly", "onNext: "+integer+" current thread :"+Thread.currentThread());
}
});
输出的结果是:
call: integer: 1 current thread :Thread[RxNewThreadScheduler-1, 5 ,main]
call: integer: 2 current thread :Thread[RxNewThreadScheduler-1, 5 ,main]
call: s: 1 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
call: s: 2 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
onNext: 1 current thread :Thread[main, 5 ,main]
onNext: 2 current thread :Thread[main, 5 ,main]
onCompleted: Thread[main, 5 ,main]
当我把上面的 .subscribeOn(Schedulers.io())
这行代码注释掉后,结果如下
call: integer: 1 current thread :Thread[RxNewThreadScheduler-1, 5 ,main]
call: integer: 2 current thread :Thread[RxNewThreadScheduler-1, 5 ,main]
call: s: 1 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
call: s: 2 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
onNext: 1 current thread :Thread[main, 5 ,main]
onNext: 2 current thread :Thread[main, 5 ,main]
onCompleted: Thread[main, 5 ,main]
结果竟然没变,这是为啥呢,?先留一个问号。然后我再把刚才注释的打开,把.observeOn(Schedulers.newThread())
注释掉,结果如下
call: integer: 1 current thread :Thread[RxCachedThreadScheduler-2, 5 ,main]
call: integer: 2 current thread :Thread[RxCachedThreadScheduler-2, 5 ,main]
call: s: 1 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
call: s: 2 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
onNext: 1 current thread :Thread[main, 5 ,main]
onNext: 2 current thread :Thread[main, 5 ,main]
onCompleted: Thread[main, 5 ,main]
如果把.subscribeOn(Schedulers.io())
和.observeOn(Schedulers.newThread())
都注释掉,输出的结果如下:
call: integer: 1 current thread :Thread[main, 5 ,main]
call: integer: 2 current thread :Thread[main, 5 ,main]
call: s: 1 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
call: s: 2 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
onNext: 1 current thread :Thread[main, 5 ,main]
onNext: 2 current thread :Thread[main, 5 ,main]
onCompleted: Thread[main, 5 ,main]
当我在.observeOn(AndroidSchedulers.mainThread())
之前添加一个.subscribeOn(Schedulers.newThread())
的时候,输出的结果如下:
call: integer: 1 current thread :Thread[RxCachedThreadScheduler-2, 5 ,main]
call: integer: 2 current thread :Thread[RxCachedThreadScheduler-2, 5 ,main]
call: s: 1 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
call: s: 2 current thread :Thread[RxCachedThreadScheduler-1, 5 ,main]
onNext: 1 current thread :Thread[main, 5 ,main]
onNext: 2 current thread :Thread[main, 5 ,main]
onCompleted: Thread[main, 5 ,main]
不知道你们发现点规律没,
- observeOn()执行后,后面的代码都是在 observeOn() 所设置的线程中的,直到遇到下一个 observeOn() 为止。
- subscribeOn() 和 observeOn() 都做了线程切换的工作,最后一个当把两行代码都注释掉后,public String call(Integer integer) 这个方法就执行在了 main 中了,如果有一个,那么就不会执行在 main 中
- subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;
- observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。
- 当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}
public final Observable<Observable<T>> nest() {
return just(this);
}
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
}
doOnSubscribe()
Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程.有一个方法 Observable.doOnSubscribe() 。
它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。
默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;
而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
示例代码:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
搬运地址:
既已览卷至此,何不品评一二: