扫盲系列 - RxJava 2.0 -- 常用操作符
下面是根据 RxJava 2.x 使用详解系列 的 blog ,挑拣我认为会用到的操作符,进行的记录,当做一个速查手册来使用。
创建操作符
creat()
上面已经有例子了,就不多举例子了
just()
最多能接受 10 个参数,也就是最多能发送 10 条数据
Flowable.just("test1", "test2").subscribe(s -> Log.d("hoyouly", "just onNext() " + s));
//结果
just onNext() test1
just onNext() test2
fromArray()
接受任意长度的数据数组,不要直接传递一个 List 集合,
Flowable.fromArray(1, 2 , 3 , 4).subscribe(integer -> Log.d("hoyouly", "fromArray : onNext() " + integer));
//结果
fromArray : onNext() 1
fromArray : onNext() 2
fromArray : onNext() 3
fromArray : onNext() 4
empty()
数据长度为 0 ,不会发送任何数据,直接发送 onComplete()
Flowable.empty().subscribe(
o -> Log.d("hoyouly", "empty : onNext() " + o),
throwable -> Log.d("hoyouly", "empty : onError() "),
() -> Log.d("hoyouly", "empty : onComplete() "));
//结果
empty : onComplete()
error()
直接发送 onError() , 需要传递一个 Throwable ,
Flowable.error(new RuntimeException(" test error()")).subscribe(
o -> Log.d("hoyouly", "error : onNext() " + o),
throwable -> Log.d("hoyouly", "error : onError() " + throwable.getMessage()),
() -> Log.d("hoyouly", "error : onComplete() "));
//结果
error : accept test error()
fromIterable()
可以遍历可迭代的数据集合,例如 list , fromArray 则不可以遍历 List 集合
List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");
Flowable.fromIterable(list).subscribe(s -> Log.d("hoyouly", "onNext() "+s));
timer()
延迟一段时间发送数据
//延迟 1 秒钟后执行 onNext() ,
Flowable.timer(1, TimeUnit.SECONDS).subscribe(aLong -> Log.d("hoyouly", "empty : onNext() " + aLong));
interval()
不间断发送数据
//每隔 1 秒钟发送一条数据
Flowable.interval(1,TimeUnit.SECONDS).subscribe(aLong -> Log.d("hoyouly", "empty : onNext() " + aLong));
intervalRange
一定范围内不断发送数据
Log.d("hoyouly", "intervalRange 开始执行 ");
//从 0 到 10 ,初始间隔为 4 秒,之后每间隔 2 秒发送数据
Flowable.intervalRange(0, 10 , 4 , 2 , TimeUnit.SECONDS).subscribe(
o -> Log.d("hoyouly", "intervalRange : onNext() " + o),
throwable -> Log.d("hoyouly", "intervalRange : onError() " + throwable.getMessage()),
() -> Log.d("hoyouly", "intervalRange : onComplete() "));
//输出结果
03-29 23:39:11.738 8391 8391 D hoyouly : intervalRange 开始执行
03-29 23:39:15.747 8391 8414 D hoyouly : intervalRange : onNext() 0
03-29 23:39:17.747 8391 8414 D hoyouly : intervalRange : onNext() 1
03-29 23:39:19.748 8391 8414 D hoyouly : intervalRange : onNext() 2
03-29 23:39:21.746 8391 8414 D hoyouly : intervalRange : onNext() 3
03-29 23:39:23.747 8391 8414 D hoyouly : intervalRange : onNext() 4
03-29 23:39:25.747 8391 8414 D hoyouly : intervalRange : onNext() 5
03-29 23:39:27.747 8391 8414 D hoyouly : intervalRange : onNext() 6
03-29 23:39:29.747 8391 8414 D hoyouly : intervalRange : onNext() 7
03-29 23:39:31.746 8391 8414 D hoyouly : intervalRange : onNext() 8
03-29 23:39:33.747 8391 8414 D hoyouly : intervalRange : onNext() 9
03-29 23:39:33.748 8391 8414 D hoyouly : intervalRange : onComplete()
里面的 10 是指发送 10 条数据,所以才有了 0~9 10个 onNext()
第一行 log 与第二行 log 间隔时间为 4 秒
之后每行 log 间隔是 2 秒
defer()
需要每个观察者被订阅的时候都重新创建被观察者(一对一),则可以使用 defer 操作符:
Flowable<String> flowable = Flowable.defer(new Callable<Publisher<String>>() {
@Override
public Publisher<String> call() throws Exception {
return Flowable.just("1", "2");
}
});
//订阅第一个观察者
flowable.subscribe(str -> Log.i("hoyouly", "defer : one onNext() " + str));
//订阅第二个观察者
flowable.subscribe(str -> Log.i("hoyouly", "defer : second onNext() " + str));
//输出结果
hoyouly : defer : one onNext() 1
hoyouly : defer : one onNext() 2
hoyouly : defer : second onNext() 1
hoyouly : defer : second onNext() 2
只有当第一个观察者执行完后才回去创建第二个被观察者然后订阅观察者,然后才开始(第二个被观察者)发送事件消息。
过滤操作符
filter()
自己设定规则来过滤数据
Flowable.just(1, 2 , 3).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer value) throws Exception {
//过滤出大于等于 2 的值
return value >= 2;
}
}).subscribe(s -> Log.d("hoyouly", "filter onNext() " + s));
//结果
hoyouly : filter onNext() 2
hoyouly : filter onNext() 3
只输出了 2 和 3 ,因为 只有 test() 返回 true 的数据,才会到下游。
take()
类似 request() 方法,来限制发射数据的数量
Flowable.interval(1, TimeUnit.SECONDS)
.take(5) //只发射 5 个元素
.subscribe(s -> Log.d("hoyouly", "take onNext() " + s));
// 结果
03-30 21:52:27.868 hoyouly : take onNext() 1
03-30 21:52:28.867 hoyouly : take onNext() 2
03-30 21:52:29.867 hoyouly : take onNext() 3
03-30 21:52:30.867 hoyouly : take onNext() 4
尽管是每隔 1 秒输出一个,但是经过 take() 过滤后,到下游的只有 5 个数据
也可以采用时间过滤,
Flowable.interval(1, TimeUnit.SECONDS)
.take(5, TimeUnit.SECONDS) //只发射 5 秒内的数据
.subscribe(s -> Log.d("hoyouly", "take 时间过滤 onNext() " + s));
输出的结果如下
4 没输出,是因为是 5 秒内的数据,这里不包括刚好 5 秒钟的数据4
ofType()
筛选特定类型的数据
Flowable.just("a", 1 , 3 , "b").
ofType(Integer.class)//筛选Integer
.subscribe(s -> Log.d("hoyouly", "take ofType() " + s));
//结果
03-30 22:15:43.689 6642 6642 D hoyouly : take ofType() 1
03-30 22:15:43.690 6642 6642 D hoyouly : take ofType() 3
其实等价与
Flowable.just("a", 1 , 3 , "b").
filter((Predicate<Object>) obj -> obj instanceof Integer)
.subscribe(s -> Log.d("hoyouly", "take filter ofType() " + s));
distinct()/distinctUntilChanged()
过滤掉重复的
Flowable.just("a", "b", "c", "b", "b", "c")
.distinct()//过滤掉重复的
.subscribe(ele -> Log.i("hoyouly", "distinct " + ele));
Flowable.just("a", "b", "c", "b", "b", "c")
.distinctUntilChanged()//过滤掉连续重复的
.subscribe(ele -> Log.i("hoyouly", "distinctUntilChanged " + ele));
//结果
hoyouly : distinct a
hoyouly : distinct b
hoyouly : distinct c
hoyouly : distinctUntilChanged a
hoyouly : distinctUntilChanged b
hoyouly : distinctUntilChanged c
hoyouly : distinctUntilChanged b
hoyouly : distinctUntilChanged c
distinctUntilChanged() 输出的结果 a b c b c ,只过滤掉一个 b ,因为 b 是连续重复的, a 和 c 虽然也重复,但是不连续,所以不会过滤掉
distinct() 则不管连续不连续,只要重复就会过滤,所以 输出的结果 只有 a b
timeout()
超时处理
Flowable.intervalRange(0, 5 , 0 , 2 , TimeUnit.SECONDS)
.subscribe(ele -> Log.i("hoyouly", " intervalRange " + ele));
Flowable.intervalRange(0, 5 , 0 , 2 , TimeUnit.SECONDS)
.timeout(1, TimeUnit.SECONDS) //超时时间是 1 秒
.subscribe(ele -> Log.i("hoyouly", "timeout " + ele));
//结果
03-30 22:25:21.343 7850 7891 I hoyouly : intervalRange 0
03-30 22:25:21.353 7850 7893 I hoyouly : timeout 0
03-30 22:25:23.824 7850 7891 I hoyouly : intervalRange 1
03-30 22:25:25.342 7850 7891 I hoyouly : intervalRange 2
03-30 22:25:27.343 7850 7891 I hoyouly : intervalRange 3
03-30 22:25:29.344 7850 7891 I hoyouly : intervalRange 4
intervalRange() 没有 timeout() 操作符的时候,会输出 0 1 2 3 4
而因为 他的间隔是 1 秒,带上 timeout() 之后,设置的超时时间也是 1 秒,所以输出 0 之后,就超时了
throttleFirst()
throttle: 节流,减速,窒息
一段时间内只相应一次操作,可做放重复点击操作。
Flowable.intervalRange(0, 10 , 0 , 1 , TimeUnit.SECONDS)
.throttleFirst(1,TimeUnit.SECONDS)//一秒钟只处理第一个元素
.subscribe(ele -> Log.i("hoyouly", "throttleFirst " + ele));
//输出的结果
03-30 22:31:44.087 8628 8672 I hoyouly : throttleFirst 0
03-30 22:31:46.086 8628 8672 I hoyouly : throttleFirst 2
03-30 22:31:48.086 8628 8672 I hoyouly : throttleFirst 4
03-30 22:31:50.086 8628 8672 I hoyouly : throttleFirst 6
03-30 22:31:52.085 8628 8672 I hoyouly : throttleFirst 8
原本应该 1 秒钟输出一个的,可是因为添加了 throttleFirst() 这个操作符,只输出了 一半,另外一半就被屏蔽掉了
throttleLast() / sample()
两个功能一样,都是间隔一段时间采集最后一个元素
Flowable.intervalRange(0, 10 , 0 , 1 , TimeUnit.SECONDS)
.sample(2, TimeUnit.SECONDS)//每 2 秒中采集最后一个元素
.subscribe(ele -> Log.i("hoyouly", "sample " + ele));
//等价于
Flowable.intervalRange(0, 10 , 0 , 1 , TimeUnit.SECONDS)
.throttleLast(2, TimeUnit.SECONDS)//每 2 秒中采集最后一个元素
.subscribe(ele -> Log.i("hoyouly", "throttleLast " + ele));
//结果
03-30 22:37:12.888 9179 9224 I hoyouly : sample 1
03-30 22:37:14.887 9179 9224 I hoyouly : sample 3
03-30 22:37:16.888 9179 9224 I hoyouly : sample 5
03-30 22:37:18.888 9179 9224 I hoyouly : sample 7
只输出了 1 3 5 7 ,
throttleWithTimeout() / debounce()
喜新厌旧
如果有新数据,则抛弃旧数据,可以用在即时搜索需求中
Log.d("hoyouly", "debounce begin ");
Flowable.intervalRange(0, 10 , 0 , 1 , TimeUnit.SECONDS)
.debounce(2, TimeUnit.SECONDS)//2秒内有新数据则抛弃旧数据
.subscribe(
ele -> Log.i("hoyouly", "debounce " + ele),
throwable -> Log.d("hoyouly", "debounce : onError() "),
() -> Log.d("hoyouly", "debounce : onComplete() "));
//结果
03-30 22:43:54.550 9854 9854 D hoyouly : debounce begin
03-30 22:44:03.566 9854 9901 I hoyouly : debounce 9
03-30 22:44:03.567 9854 9901 D hoyouly : debounce : onComplete()
中间间隔了 9 秒钟,才输出了 9 ,而且只输出了 9 ,然后就 onComplete() , 这个就是因为每间隔 1 秒就会有新数据,而 debounce() 设置的是 2 秒,所以会等到最后一个 9 输出后,等待两秒后发现没数据了,然后把最后一个 9 发给下游。然后事件就结束了
合并聚合
startWith() / startWithArray()
在发送元素之前追加数据或者追加新的被观察者。
Flowable.just(4, 5 , 6)
.startWith(0)
.startWith(Flowable.just(1, 2 , 3))
.startWithArray(7, 8)
.subscribe(
ele -> Log.i("hoyouly", "startWith " + ele),
throwable -> Log.d("hoyouly", "startWith : onError() "),
() -> Log.d("hoyouly", "startWith : onComplete() "));
//结果
hoyouly : startWith 7
hoyouly : startWith 8
hoyouly : startWith 1
hoyouly : startWith 2
hoyouly : startWith 3
hoyouly : startWith 0
hoyouly : startWith 4
hoyouly : startWith 5
hoyouly : startWith 6
hoyouly : startWith : onComplete()
startWith() 和 感觉有点像是 栈,先进后出,
startWithArray() 追加多个元素。
这两个都是使用的 concat() 和 concatArray()
concat() / concatArray()
concat 合并多个数组。合并多个字符
concat 操作符可以连接最多 4 个的被观察者,他们的顺序是串行执行的:
如果需要多于 4 个被观察合并在一起,可以使用 concatArray 操作符:
Flowable.concat(Flowable.just(1, 2), Flowable.just(5, 6), Flowable.just(3, 4))
.subscribe(
ele -> Log.i("hoyouly", "concat " + ele),
throwable -> Log.d("hoyouly", "concat : onError() "),
() -> Log.d("hoyouly", "concat : onComplete() ")
);
//结果
hoyouly : concat 1
hoyouly : concat 2
hoyouly : concat 5
hoyouly : concat 6
hoyouly : concat 3
hoyouly : concat 4
hoyouly : concat : onComplete()
merge() / mergeArray()
按照时间线并行执行
Flowable.merge(
Flowable.intervalRange(0, 5 , 1 , 1 , TimeUnit.SECONDS),
Flowable.intervalRange(6, 3 , 1 , 1 , TimeUnit.SECONDS))
.subscribe(ele -> Log.i("hoyouly", "merge " + ele),
throwable -> Log.d("hoyouly", "merge : onError() "),
() -> Log.d("hoyouly", "merge : onComplete() ")
);
//结果
03-30 23:20:03.139 12508 12552 I hoyouly : merge 0
03-30 23:20:03.139 12508 12553 I hoyouly : merge 6
03-30 23:20:04.140 12508 12552 I hoyouly : merge 1
03-30 23:20:04.140 12508 12552 I hoyouly : merge 7
03-30 23:20:05.139 12508 12553 I hoyouly : merge 8
03-30 23:20:05.139 12508 12553 I hoyouly : merge 2
03-30 23:20:06.139 12508 12552 I hoyouly : merge 3
03-30 23:20:07.138 12508 12552 I hoyouly : merge 4
03-30 23:20:07.138 12508 12552 D hoyouly : merge : onComplete()
Flowable.concat(
Flowable.intervalRange(0, 5 , 1 , 1 , TimeUnit.SECONDS),
Flowable.intervalRange(6, 3 , 1 , 1 , TimeUnit.SECONDS))
.subscribe(ele -> Log.i("hoyouly", "concat " + ele),
throwable -> Log.d("hoyouly", "concat : onError() "),
() -> Log.d("hoyouly", "concat : onComplete() ")
);
//结果
03-30 23:21:14.817 12711 12758 I hoyouly : concat 0
03-30 23:21:15.817 12711 12758 I hoyouly : concat 1
03-30 23:21:16.817 12711 12758 I hoyouly : concat 2
03-30 23:21:17.817 12711 12758 I hoyouly : concat 3
03-30 23:21:18.817 12711 12758 I hoyouly : concat 4
03-30 23:21:19.820 12711 12783 I hoyouly : concat 6
03-30 23:21:20.819 12711 12783 I hoyouly : concat 7
03-30 23:21:21.819 12711 12783 I hoyouly : concat 8
03-30 23:21:21.819 12711 12783 D hoyouly : concat : onComplete()
这样就能看出区别了, concat() 是串行执行的,执行完一个执行另外一个, merge() 是并行执行的,两个同时开工
zip()
压缩操作符,多个被观察者压缩成一个,数量不同的,以少的为基准,木桶效应
Flowable.zip(
Flowable.intervalRange(0, 5 , 1 , 1 , TimeUnit.SECONDS),
Flowable.intervalRange(6, 3 , 1 , 2 , TimeUnit.SECONDS),(aLong, aLong2) -> {
Log.d("hoyouly", "aLong : " + aLong + " aLong2: " + aLong2);
return aLong + aLong2;
})
.subscribe(ele -> Log.i("hoyouly", "zip " + ele),
throwable -> Log.d("hoyouly", "zip : onError() "),
() -> Log.d("hoyouly", "zip : onComplete() ")
);
//结果
03-30 23:27:09.126 13412 13455 D hoyouly : aLong : 0 aLong2: 6
03-30 23:27:09.126 13412 13455 I hoyouly : zip 6
03-30 23:27:11.125 13412 13455 D hoyouly : aLong : 1 aLong2: 7
03-30 23:27:11.125 13412 13455 I hoyouly : zip 8
03-30 23:27:13.126 13412 13455 D hoyouly : aLong : 2 aLong2: 8
03-30 23:27:13.126 13412 13455 I hoyouly : zip 10
03-30 23:27:13.126 13412 13455 D hoyouly : zip : onComplete()
虽然第一个被观察者 5 秒钟就发送完数据了,而第二个观察者需要 6 秒钟,但是还是以第二个观察者为准,没间隔 2 秒打印向下发送一条数据,并且只发送三条数据
combineLatest()
在同一个时间线上,合并最后的元素,
Flowable.combineLatest(
Flowable.intervalRange(0, 5 , 1 , 800 , TimeUnit.MILLISECONDS),
Flowable.intervalRange(6, 3 , 1 , 2 , TimeUnit.SECONDS),(aLong, aLong2) -> {
Log.d("hoyouly", "aLong : " + aLong + " aLong2: " + aLong2);
return aLong + aLong2;
})
.subscribe(ele -> Log.i("hoyouly", "combineLatest " + ele),
throwable -> Log.d("hoyouly", "combineLatest : onError() "),
() -> Log.d("hoyouly", "combineLatest : onComplete() ")
);
//结果
//刚开始的时候,两个被观察者分别是 1 和 6 所以值是7
03-30 23:32:55.921 13878 13911 D hoyouly : aLong : 1 aLong2: 6
03-30 23:32:55.921 13878 13911 I hoyouly : combineLatest 7
//600ms后,第一个观察者变成 2 ,第二个还是 6 所以值是 8
03-30 23:32:56.521 13878 13910 D hoyouly : aLong : 2 aLong2: 6
03-30 23:32:56.521 13878 13910 I hoyouly : combineLatest 8
//再过 800ms 后,第一个观察者变成 3 ,第二个还是 6 所以值是 9
03-30 23:32:57.322 13878 13910 D hoyouly : aLong : 3 aLong2: 6
03-30 23:32:57.322 13878 13910 I hoyouly : combineLatest 9
//再过 600ms 后,第一个观察者变成 3 ,第二个还是 7 所以值是 10
03-30 23:32:57.922 13878 13911 D hoyouly : aLong : 3 aLong2: 7
03-30 23:32:57.922 13878 13911 I hoyouly : combineLatest 10
//再过 200ms 后,第一个观察者变成 4 ,第二个还是 7 所以值是 11
03-30 23:32:58.121 13878 13910 D hoyouly : aLong : 4 aLong2: 7
03-30 23:32:58.121 13878 13910 I hoyouly : combineLatest 11
//再过 800ms 后,第一个观察者变成 4 ,第二个是 8 所以值是 12
03-30 23:32:59.924 13878 13911 D hoyouly : aLong : 4 aLong2: 8
03-30 23:32:59.924 13878 13911 I hoyouly : combineLatest 12
03-30 23:32:59.924 13878 13911 D hoyouly : combineLatest : onComplete()
其实就是只要有一个被观察者发生改变,就取出来所有被观察者,然后进行合并,有点像是牵一发而动全身的感觉。
reduce()
可以把被观察者所有的元素聚合成一个单一的元素
Flowable.intervalRange(3, 3 , 1 , 2 , TimeUnit.SECONDS)
.reduce((last, item) -> {
Log.d("hoyouly", "last : " + last + " item " + item);
return last * item;
}).subscribe(ele -> Log.i("hoyouly", "reduce " + ele),
throwable -> Log.d("hoyouly", "reduce : onError() "),
() -> Log.d("hoyouly", "reduce : onComplete() ")
);
//结果
03-30 23:45:09.743 14486 14532 D hoyouly : last : 3 item 4
03-30 23:45:11.741 14486 14532 D hoyouly : last : 12 item 5
03-30 23:45:11.741 14486 14532 I hoyouly : reduce 60
3*4*5 的结果就是 60
count()
统计一个被观察者发送多少个元素可
Flowable.intervalRange(3, 8 , 1 , 2 , TimeUnit.SECONDS)
.count()
.subscribe(aLong -> Log.d("hoyouly", "count : onNext "+aLong));
//结果
03-30 23:49:46.727 15061 15095 D hoyouly : count : onNext 8
一共发送了 8 次
条件操作符
all
判断所有元素是否满足某个条件
Flowable.just(4, 5 , 6)
.all(integer -> integer > 3).subscribe(aBoolean -> Log.d("hoyouly", "all : onNext " + aBoolean),
throwable -> Log.d("hoyouly", "all : onError()"));
//结果 因为 三个元素都大于 3
03-31 21:36:17.538 8603 8603 D hoyouly : all : onNext true
Flowable.just(4, 5 , 6)
.all(integer -> integer > 5).subscribe(aBoolean -> Log.d("hoyouly", "all : onNext " + aBoolean),
throwable -> Log.d("hoyouly", "all : onError()"));
//结果 因为并不是所有的元素都大于5
03-31 21:35:32.686 8463 8463 D hoyouly : all : onNext false
all() 最终返回的是一个Single
contains()
被观察者中是否包含某个元素
Flowable.just(4, 5 , 6)
.contains(5).subscribe(aBoolean -> Log.d("hoyouly", "contains : onNext " + aBoolean));
//结果
03-31 21:41:20.856 8935 8935 D hoyouly : contains : onNext true
any()
contains() 内部也是调用的 any() ,判断是否存在某一个元素满足一定的条件
Flowable.just(4, 5 , 6)
.any(integer -> integer > 5).subscribe(
aBoolean -> Log.d("hoyouly", "any : onNext " + aBoolean));
//结果
03-31 21:45:36.995 9346 9346 D hoyouly : any : onNext true
Flowable.just(4, 5 , 6)
.any(integer -> integer > 7).subscribe(
aBoolean -> Log.d("hoyouly", "any : onNext " + aBoolean));
//结果
03-31 21:45:36.996 9346 9346 D hoyouly : any : onNext false
变换操作符
Map()
可以把一个元素转换成新的元素发射出去
Flowable.just(4, 5 , 6)
.map(integer -> "map_" + integer)
.subscribe(
aBoolean -> Log.d("hoyouly", "map onNext() " + aBoolean),
throwable -> Log.d("hoyouly", "map : onError() "),
() -> Log.d("hoyouly", "map : onComplete() "));
//结果是每个元素前面添加了 map_ 前缀
03-31 21:51:11.584 9789 9789 D hoyouly : map onNext() map_4
03-31 21:51:11.585 9789 9789 D hoyouly : map onNext() map_5
03-31 21:51:11.593 9789 9789 D hoyouly : map onNext() map_6
03-31 21:51:11.593 9789 9789 D hoyouly : map : onComplete()
flatMap()
把每一个元素转换成一个新的被观察者,每个被观察者发射的数据将会合并成新的观察者,这些元素顺序输出
Flowable.just(4, 5 , 6)
.flatMap((Function<Integer, Publisher<?>>) i -> Flowable.just(i, i * i))
.subscribe(
aBoolean -> Log.d("hoyouly", "flatMap onNext() " + aBoolean),
throwable -> Log.d("hoyouly", "flatMap : onError() "),
() -> Log.d("hoyouly", "flatMap : onComplete() "));
//一个元素 转换成一个新的观察者,新的观察者有两个元素,一个是之前的元素,另一个是之前元素的平方。所以结果如下。
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 4
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 16
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 5
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 25
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 6
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 36
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap : onComplete()
内部使用的 merge() 合并元素
concatMap()
功能和 flatMap() 类似,只不过这个内部使用的是 concat() ,严格按照顺序执行的,而 flatMap() 可能出现元素交叉的情况。
Flowable.just(4, 5 , 6)
.concatMap((Function<Integer, Publisher<?>>) i -> Flowable.just(i, i * i))
.subscribe(
aBoolean -> Log.d("hoyouly", "concatMap onNext() " + aBoolean),
throwable -> Log.d("hoyouly", "concatMap : onError() "),
() -> Log.d("hoyouly", "concatMap : onComplete() "));
//结果
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 4
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 16
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 5
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 25
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 6
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 36
cast()
强制转换每一个元素类型,
Flowable.just(4, "5", 6)
.cast(Integer.class)
.subscribe(
aBoolean -> Log.d("hoyouly", "cast onNext() " + aBoolean),
throwable -> Log.d("hoyouly", "cast : onError() "),
() -> Log.d("hoyouly", "cast : onComplete() "));
//结果
03-31 22:07:12.269 11112 11112 D hoyouly : cast onNext() 4
03-31 22:07:12.269 11112 11112 D hoyouly : cast : onError()
内部调用 map 操作符,字符串 5 不是 Integer 。所以执行到了 onError() ,后面的元素 6 就不在执行了
buffer()
把多个元素打包成一个元素一次发射数据
Flowable.just(4, "5", 6 , "ss", 7)
.buffer(3)//三个元素打包成一个元素发射
.subscribe(
aBoolean -> Log.d("hoyouly", "buffer onNext() " + aBoolean),
throwable -> Log.d("hoyouly", "buffer : onError() "),
() -> Log.d("hoyouly", "buffer : onComplete() "));
//结果
03-31 22:11:30.795 11407 11407 D hoyouly : buffer onNext() [4, 5 , 6]
03-31 22:11:30.795 11407 11407 D hoyouly : buffer onNext() [ss, 7]
03-31 22:11:30.795 11407 11407 D hoyouly : buffer : onComplete()
toList()
把所有元素转换成一个 List 一次过发送出去
Flowable.just(4, "5", 6 , "ss", 7)
.toList()
.subscribe(
aBoolean -> Log.d("hoyouly", "toList onNext() " + aBoolean),
throwable -> Log.d("hoyouly", "toList : onError() "));
//结果
03-31 22:14:34.302 11629 11629 D hoyouly : toList onNext() [4, 5 , 6 , ss , 7]
搬运地址:
关于 RxJava 最友好的文章——背压(Backpressure)
既已览卷至此,何不品评一二: