扫盲系列 - RxJava 2.0 -- 观察者模式
RxJava 其中 Rx 是 ReactiveX 的缩写, ReactiveX 又是 Reactive Extensions 的缩写
所以 RxJava 就是 java 上异步和基于事件响应式编程
RxJava 基于观察者模式,主要包括 观察者,被观察者,订阅,事件。
观察者模式主要分以下几种
- Observable 和 Observer
- Flowable 和 Subscriber
- Single 和 SingleObserver
- Completable 和 CompletableObserver
- Maybe 和 MaybeObserver
Observeable 和 Observer
例子如下
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override//Emitter 发射器,用来发送事件的,
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "subscribe:1 ");
e.onNext(1);
Log.d(TAG, "subscribe:2 ");
e.onNext(2);
Log.d(TAG, "subscribe:3 ");
e.onNext(3);
Log.d(TAG, "subscribe:onComplete ");
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
private int i;
@Override//Disposable 一次性,理解为订阅事件的开关,调用 dispose() 方法 方法后,会切断联系
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
i++;
if (i == 2) {
Log.d(TAG, "onNext: dispose");
mDisposable.dispose();
Log.d(TAG, "onNext: isDispose: " + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
//结果
D hoyouly : smapleTest2:
D hoyouly : onSubscribe:
D hoyouly : subscribe:1
D hoyouly : onNext: 1
D hoyouly : subscribe:2
D hoyouly : onNext: 2
D hoyouly : onNext: dispose
D hoyouly : onNext: isDispose: true
D hoyouly : subscribe:3
D hoyouly : subscribe:onComplete
- ObservableEmitter 用来发送事件的,可以发送三种类型事件, next 事件、complete事件和 error 事件
- 上游可以无限的发送 onNext ,下游可以无限的接收 onNext
- 当上游发送了一个 onComplete 后,可以继续发送发送其他事件,但是下游只可以收到这个 onComplete ,后续的事件就收不到了
- 当上游发送了一个 onError 后,可以继续发送发送其他事件,但是下游只可以收到这个 onError ,后续的事件就收不到了
- 上游可以不发送 onError 或者 onComplete
- onError 或者 onComplete 必须唯一并且互斥,即不能发送多个 onComplete 也不能发送多个 onError ,也不能先发一个 onError ,在发送一个 onComplete
- Disposable 订阅事件的开关,调用 dispose() 方法 方法后,会切断联系 如上所示, onNext 2 之后,调用了 dispose ,下游就收不到数据了,但是上游却还在发送数据。
subscribe() 有很多重载方法
这里面看到了又是 Action ,又是 Consumer ,这两个区别,和 Observer 又是啥关系呢?
Action 和 Consumer
- Action 无参数类型
- Consumer
单一参数类型。 - BigConsumer<T,R> 双参数类型,
- Consumer<Object[]> 多参数类型
这里其实最终都是封装成一个 Observer 对象。只不过使用 Action 和 Consumer 简化观察者而已。
Flowable 和 Subscriber
//被观察者
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
emitter.onNext("test1");
emitter.onNext("test2");
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER);
flowable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
}
}, new Action() {//相当于 onComplete ,
@Override
public void run() {
}
}, new Consumer<Subscription>() {//相当于onSubscribe
@Override
public void accept(Subscription subscription) throws Exception {
}
});
这里就使用了 Consumer 和 Action ,其实里面还是把这四个封装成了一个 Subscriber 对象,使用 Consumer 和 Action 只不过是简化观察者而已。
同理, subscribe() 有多重重载方法,如下
五种背压策略
Flowable 支持背压,默认的背压策略是 128 。所以 Flowable 的 create() 比 Observable 的多了一个参数,BackpressureStrategy.BUFFER,看着像是背压策略,一共有五种。
- BackpressureStrategy.MISSING 没有指定背压策略,需要下游操作符指定背压策略
- BackpressureStrategy.DROP 如果 Flowable 的异步缓存池满了,则会丢掉将要放入缓存池中的数据
- BackpressureStrategy.LATEST 如果缓存池满了,会丢掉将要放入缓存池中的数据。这一点与 DROP 策略一样,不同的是,不管缓存池的状态如何, LATEST 策略会将最后一条数据强行放入缓存池中。
- BackpressureStrategy.BUFFER Flowable的异步缓存池同 Observable 的一样,没有固定大小,可以无限制添加数据,不会抛出 MissingBackpressureException 异常,但会导致OOM.
- BackpressureStrategy.ERROR 如果放入 Flowable 的异步缓存池中的数据超限(默认是128)了,则会抛出 MissingBackpressureException 异常
Observeable/Observer 与 Flowable/Subscriber 的区别
Observeable 用于 订阅 Observer ,不支持背压 , 使用场景:
- 不超过 1000 个元素,随着时间的流逝,基本不会出现 OOM ,
- GUI事件或者 1000Hz 频率以下的元素,
- 平台不支持Java Stream (Java 8 新特性), Observable 的开销比 Flowable 小
Flowable 用于订阅 Subscriber ,支持背压, 使用场景:
- 超过 10K 的元素,
- 读取硬盘操作,
- 通过 JDBC 读取数据库,
- 网络 IO 操作
Single 和 SingleObserver
单一的连续事件流,即只有一个 onNext() 事件,接着就触发 onComplete() 或者 onError() , 可以使用 Single 只包含两个事件,一个是正常处理成功的 onSuccess ,一个是处理失败的 onError() ,它只发送一次,所以不存在背压问题。
Single single = Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
emitter.onSuccess("success");
emitter.onSuccess("again");//错误,重复调用也不会执行。
}
});
single.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String o) {
//相当于 onNext 和 onCompelete()
Log.d("hoyouly", "onSuccess " + o);
}
@Override
public void onError(Throwable e) {
}
});
也可以使用 Actions 简化,就是使用的 BiConsumer 这个带有两个参数的Actions
//使用 BiConsumer 简化
single.subscribe(new BiConsumer<String, Throwable>() {
@Override
public void accept(String o, Throwable o2) throws Exception {
}
});
可以直接转换成 Flowable 或者 Observable
single.toFlowable();
single.toObservable();
Completable 和 CompletableObserver
不关心 onNext() ,只有 onComplete() 和onError()
Completable.create(emitter -> {
emitter.onComplete();//单一的 onComplete 事件
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
Log.d("hoyouly", "Completable : onComplete ");
}
@Override
public void onError(Throwable e) {
Log.d("hoyouly", "Completable : onError ");
}
});
其他的 和 Single 类似,可以通过 Actions 简化观察者,也可以转成 Flowable 或者 Observable
Maybe 和 MaybeObserver
可能发送一个需求,也可能不发送需求,就可以使用 这个。是 Single 和 Completable 的混合体,可能调用一下其中一种情况
- onSuccess()或者onError()
- onComplete() 或者 onError()
注意: onSuccess()和 onComplete() 是互斥的存在
//判断是否登陆
Maybe.just(isLogin())
//可能涉及到 IO 操作,放在子线程
.subscribeOn(Schedulers.newThread())
//取回结果传到主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean value) {
if(value){
...
}else{
...
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
执行 onSuccess() 就不会执行 onComplete() ,同理,执行 onComplete() 就肯定不会执行 onSuccess(). 感觉这个观察者有点扯,反正我是不会用的。太不确定性了。
搬运地址:
关于 RxJava 最友好的文章——背压(Backpressure)
既已览卷至此,何不品评一二: