扫盲系列 - RxJava 2.0 -- 背压
backpressure 背压
背压,这个玩意是 Rxjava 绕不过去的坎。面试被问到了几次,都没说明白,因为糊里糊涂的。只知道 Rxjava 2.0 支持背压,Rxjava 1.0 不支持背压。但是为啥,怎么支持,就不清楚了。所以就想着查查资料搞懂这个玩意
什么是背压呢?
背压是指在异步场景中,被观察者发送事件速度远远快于观察者的处理速度的情况下,一种告诉被观察者降低发送速度的策略。
背压是流速控制的一种策略
- 背压的一个前提是异步,也就是说观察者和被观察者在不同的线程。如果 观察者和被观察者在同一个线程中,这个时候被观察者发送的事件,必须等到观察者接收处理完以后才能发送下一个事件
- 背压并是不像 flatmap 一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略
主要目的是控制流量的,水缸的控制能力毕竟有限,因此还得从源头控制。
Rxjava 2.0 支持背压,Rxjava 1.0 不支持背压。可是 在 Rxjava 1.0 上,怎么解决背压的问题呢?
Rxjava 1.0 对背压的操作
延迟发送数据
如果可以的话,使被观察者适当的延迟发送数据。
过滤限流
使用过滤限流操作符来处理。 这里简单说几种
- Sample 在一段时间内,只处理最后一个数据
- throttleFirst 在一段时间内只响应第一次的操作
- filter 自己写过滤条件
更多过滤操作符可以查看 RxJava 2.x 使用详解(三) 过滤操作符
缓存
主要包括一下两个操作符
- buffer 将多个事件打包放入一个 List 中,再一起发射
- window 将多个事件打包放入一个 Observable 中,再一起发射
使用背压操作符
通过一些背压操作符,来转化成支持背压的Observable
- onBackpressurebuffer() 把 Observable 发送过来的事件做缓存,当 Request 方法被调用的时候,给下层流发送一个 item ,如果给设置了缓存区的大小,那么超过这个缓冲区大小就会抛出异常
- onBackpressureDrop() 将 Observable 的事件抛弃掉,直到 Subscriber 再次调用 request(n)方法的时候,就会发送给它这之后的 n 个事件
- onBackpressureLatest() 只保留最新的事件,
Rxjava 2.0 对背压的操作
Rxjava 2.0 的 Flowable 支持背压,
Flowable.create((FlowableOnSubscribe<String>) emitter -> {
emitter.onNext("test1");
emitter.onNext("test2");
emitter.onComplete();
}, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
Log.d("hoyouly", "Flowable : onSubscribe " + s);
subscription = s;
subscription.request(1);
}
@Override
public void onNext(String s) {
Log.d("hoyouly", "Flowable : onNext " + s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
Log.d("hoyouly", "Flowable : onError " + t.getMessage());
}
@Override
public void onComplete() {
Log.d("hoyouly", "Flowable : onComplete ");
}
});
// 结果
hoyouly : Flowable : onSubscribe 0
hoyouly : Flowable : onNext test1
hoyouly : Flowable : onNext test2
hoyouly : Flowable : onComplete
Subscription 和 Disposable 一样,也是一个开关,调用 cancel() 就可以切断上下游直接的关系。 ,通过调用 request() 来控制流速
注意 request() 一定要在 onStart() 中通知被观察者发送第一个事件,并且在 onNext() 最后通知被观察者发送下一个事件。否则就会出现著名的 MissingBackpressureException
如果 onSubscribe() 中没有执行 request() ,输出的结果如下
hoyouly : Flowable : onSubscribe 0
hoyouly : Flowable : onError create could not emit value due to lack of requests
如果 onSubscribe() 中执行了 request() ,但是 onNext() 中没有执行 request() ,那么输出的结果如下
hoyouly : Flowable : onSubscribe 0
hoyouly : Flowable : onNext test1
hoyouly : Flowable : onError create: could not emit value due to lack of requests
onNext 执行了一次,这一次是因为 onSubscribe() 中的 request() ,
request()
request() 就是开启循环的钥匙,
记得我们通过 handler 发送循环消息的时候,首先通过 handler.sendMessage(),然后在 handleMessage() 中,处理完 Message ,还会执行 handler.sendMessage(),这样才能循环执行。
这个 request() 就相当于 sendMessage() , handleMessage() 相当于 onNext() ,这样是不是就容易理解了。
request() 同时是一种能力,告诉上游能处理几个,而被观察者一直发送数据。
Flowable 的新思想
RxJava 的观察者模型中,被观察者是主动的推送数据给观察者,观察者是被动接收,
而 Flowable 刚好相反,是响应式拉取 观察者主动从被观察者那里拉取数据,而被观察者变成被动的等待通知发送数据。
观察者可以根据自身实际情况按需拉取数据,而不是被动接收(也就相当于告诉上游观察者把速度慢下来),最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。
搬运地址:
关于 RxJava 最友好的文章——背压(Backpressure)
既已览卷至此,何不品评一二: