下面是根据 RxJava 2.x 使用详解系列 的 blog ,挑拣我认为会用到的操作符,进行的记录,当做一个速查手册来使用。

创建操作符

creat()

上面已经有例子了,就不多举例子了

just()

最多能接受 10 个参数,也就是最多能发送 10 条数据

Flowable.just("test1", "test2").subscribe(s -> Log.d("hoyouly", "just  onNext()  " + s));

//结果
just  onNext()  test1
just  onNext()  test2

fromArray()

接受任意长度的数据数组,不要直接传递一个 List 集合,

Flowable.fromArray(1, 2 , 3 , 4).subscribe(integer -> Log.d("hoyouly", "fromArray : onNext()  " + integer));
//结果
fromArray : onNext()  1
fromArray : onNext()  2
fromArray : onNext()  3
fromArray : onNext()  4

empty()

数据长度为 0 ,不会发送任何数据,直接发送 onComplete()

Flowable.empty().subscribe(
            o -> Log.d("hoyouly", "empty : onNext() " + o),
            throwable -> Log.d("hoyouly", "empty : onError() "),
            () -> Log.d("hoyouly", "empty : onComplete() "));
//结果
empty : onComplete()             

error()

直接发送 onError() , 需要传递一个 Throwable ,

Flowable.error(new RuntimeException(" test error()")).subscribe(
            o -> Log.d("hoyouly", "error : onNext()  " + o),
            throwable -> Log.d("hoyouly", "error : onError() " + throwable.getMessage()),
            () -> Log.d("hoyouly", "error : onComplete() "));
//结果
error : accept  test error()          

fromIterable()

可以遍历可迭代的数据集合,例如 list , fromArray 则不可以遍历 List 集合

List<String> list = new ArrayList<>();
       list.add("a");
       list.add("b");
       list.add("c");
Flowable.fromIterable(list).subscribe(s -> Log.d("hoyouly", "onNext() "+s));

timer()

延迟一段时间发送数据

//延迟 1 秒钟后执行 onNext() ,
Flowable.timer(1, TimeUnit.SECONDS).subscribe(aLong -> Log.d("hoyouly", "empty : onNext() " + aLong));

interval()

不间断发送数据

//每隔 1 秒钟发送一条数据
Flowable.interval(1,TimeUnit.SECONDS).subscribe(aLong -> Log.d("hoyouly", "empty : onNext() " + aLong));

intervalRange

一定范围内不断发送数据

Log.d("hoyouly", "intervalRange 开始执行 ");
//从 0 到 10 ,初始间隔为 4 秒,之后每间隔 2 秒发送数据
Flowable.intervalRange(0, 10 , 4 , 2 , TimeUnit.SECONDS).subscribe(
    o -> Log.d("hoyouly", "intervalRange : onNext()  " + o),
    throwable -> Log.d("hoyouly", "intervalRange : onError()  " + throwable.getMessage()),
    () -> Log.d("hoyouly", "intervalRange : onComplete() "));

//输出结果

03-29 23:39:11.738  8391  8391 D hoyouly : intervalRange 开始执行
03-29 23:39:15.747  8391  8414 D hoyouly : intervalRange : onNext()  0
03-29 23:39:17.747  8391  8414 D hoyouly : intervalRange : onNext()  1
03-29 23:39:19.748  8391  8414 D hoyouly : intervalRange : onNext()  2
03-29 23:39:21.746  8391  8414 D hoyouly : intervalRange : onNext()  3
03-29 23:39:23.747  8391  8414 D hoyouly : intervalRange : onNext()  4
03-29 23:39:25.747  8391  8414 D hoyouly : intervalRange : onNext()  5
03-29 23:39:27.747  8391  8414 D hoyouly : intervalRange : onNext()  6
03-29 23:39:29.747  8391  8414 D hoyouly : intervalRange : onNext()  7
03-29 23:39:31.746  8391  8414 D hoyouly : intervalRange : onNext()  8
03-29 23:39:33.747  8391  8414 D hoyouly : intervalRange : onNext()  9
03-29 23:39:33.748  8391  8414 D hoyouly : intervalRange : onComplete()

里面的 10 是指发送 10 条数据,所以才有了 0~9 10个 onNext()
第一行 log 与第二行 log 间隔时间为 4 秒
之后每行 log 间隔是 2 秒

defer()

需要每个观察者被订阅的时候都重新创建被观察者(一对一),则可以使用 defer 操作符:

Flowable<String> flowable = Flowable.defer(new Callable<Publisher<String>>() {
           @Override
           public Publisher<String> call() throws Exception {
               return Flowable.just("1", "2");
           }
       });

//订阅第一个观察者
flowable.subscribe(str -> Log.i("hoyouly", "defer : one onNext()  " + str));
//订阅第二个观察者
flowable.subscribe(str -> Log.i("hoyouly", "defer : second onNext()  " + str));

//输出结果
hoyouly : defer : one onNext()  1
hoyouly : defer : one onNext()  2
hoyouly : defer : second onNext()  1
hoyouly : defer : second onNext()  2

只有当第一个观察者执行完后才回去创建第二个被观察者然后订阅观察者,然后才开始(第二个被观察者)发送事件消息。

过滤操作符

filter()

自己设定规则来过滤数据

Flowable.just(1, 2 , 3).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer value) throws Exception {
                //过滤出大于等于 2 的值
                return value >= 2;
            }
        }).subscribe(s -> Log.d("hoyouly", "filter  onNext()  " + s));
//结果
hoyouly : filter  onNext()  2
hoyouly : filter  onNext()  3        

只输出了 2 和 3 ,因为 只有 test() 返回 true 的数据,才会到下游。

take()

类似 request() 方法,来限制发射数据的数量

Flowable.interval(1, TimeUnit.SECONDS)
            .take(5) //只发射 5 个元素
            .subscribe(s -> Log.d("hoyouly", "take  onNext()  " + s));
// 结果
03-30 21:52:27.868  hoyouly : take  onNext()  1
03-30 21:52:28.867  hoyouly : take  onNext()  2
03-30 21:52:29.867  hoyouly : take  onNext()  3
03-30 21:52:30.867  hoyouly : take  onNext()  4

尽管是每隔 1 秒输出一个,但是经过 take() 过滤后,到下游的只有 5 个数据

也可以采用时间过滤,

Flowable.interval(1, TimeUnit.SECONDS)
            .take(5, TimeUnit.SECONDS) //只发射 5 秒内的数据
            .subscribe(s -> Log.d("hoyouly", "take 时间过滤  onNext()  " + s));

输出的结果如下 okio

4 没输出,是因为是 5 秒内的数据,这里不包括刚好 5 秒钟的数据4

ofType()

筛选特定类型的数据

Flowable.just("a", 1 , 3 , "b").
            ofType(Integer.class)//筛选Integer
            .subscribe(s -> Log.d("hoyouly", "take  ofType()  " + s));
//结果
03-30 22:15:43.689  6642  6642 D hoyouly : take  ofType()  1
03-30 22:15:43.690  6642  6642 D hoyouly : take  ofType()  3

其实等价与

Flowable.just("a", 1 , 3 , "b").
            filter((Predicate<Object>) obj -> obj instanceof Integer)
            .subscribe(s -> Log.d("hoyouly", "take   filter   ofType()  " + s));

distinct()/distinctUntilChanged()

过滤掉重复的

Flowable.just("a", "b", "c", "b", "b", "c")
            .distinct()//过滤掉重复的
            .subscribe(ele -> Log.i("hoyouly", "distinct     " + ele));

Flowable.just("a", "b", "c", "b", "b", "c")
    .distinctUntilChanged()//过滤掉连续重复的
    .subscribe(ele -> Log.i("hoyouly", "distinctUntilChanged     " + ele));

//结果
hoyouly : distinct     a
hoyouly : distinct     b
hoyouly : distinct     c
hoyouly : distinctUntilChanged     a
hoyouly : distinctUntilChanged     b
hoyouly : distinctUntilChanged     c
hoyouly : distinctUntilChanged     b
hoyouly : distinctUntilChanged     c    

distinctUntilChanged() 输出的结果 a b c b c ,只过滤掉一个 b ,因为 b 是连续重复的, a 和 c 虽然也重复,但是不连续,所以不会过滤掉

distinct() 则不管连续不连续,只要重复就会过滤,所以 输出的结果 只有 a b

timeout()

超时处理

Flowable.intervalRange(0, 5 , 0 , 2 , TimeUnit.SECONDS)
           .subscribe(ele -> Log.i("hoyouly", " intervalRange     " + ele));

Flowable.intervalRange(0, 5 , 0 , 2 , TimeUnit.SECONDS)
   .timeout(1, TimeUnit.SECONDS) //超时时间是 1 秒
   .subscribe(ele -> Log.i("hoyouly", "timeout     " + ele));
//结果
03-30 22:25:21.343  7850  7891 I hoyouly :  intervalRange     0
03-30 22:25:21.353  7850  7893 I hoyouly : timeout     0
03-30 22:25:23.824  7850  7891 I hoyouly :  intervalRange     1
03-30 22:25:25.342  7850  7891 I hoyouly :  intervalRange     2
03-30 22:25:27.343  7850  7891 I hoyouly :  intervalRange     3
03-30 22:25:29.344  7850  7891 I hoyouly :  intervalRange     4

intervalRange() 没有 timeout() 操作符的时候,会输出 0 1 2 3 4
而因为 他的间隔是 1 秒,带上 timeout() 之后,设置的超时时间也是 1 秒,所以输出 0 之后,就超时了

throttleFirst()

throttle: 节流,减速,窒息

一段时间内只相应一次操作,可做放重复点击操作。

Flowable.intervalRange(0, 10 , 0 , 1 , TimeUnit.SECONDS)
            .throttleFirst(1,TimeUnit.SECONDS)//一秒钟只处理第一个元素
            .subscribe(ele -> Log.i("hoyouly", "throttleFirst     " + ele));

//输出的结果
03-30 22:31:44.087  8628  8672 I hoyouly : throttleFirst     0
03-30 22:31:46.086  8628  8672 I hoyouly : throttleFirst     2
03-30 22:31:48.086  8628  8672 I hoyouly : throttleFirst     4
03-30 22:31:50.086  8628  8672 I hoyouly : throttleFirst     6
03-30 22:31:52.085  8628  8672 I hoyouly : throttleFirst     8            

原本应该 1 秒钟输出一个的,可是因为添加了 throttleFirst() 这个操作符,只输出了 一半,另外一半就被屏蔽掉了

throttleLast() / sample()

两个功能一样,都是间隔一段时间采集最后一个元素

Flowable.intervalRange(0, 10 , 0 , 1 , TimeUnit.SECONDS)
           .sample(2, TimeUnit.SECONDS)//每 2 秒中采集最后一个元素
           .subscribe(ele -> Log.i("hoyouly", "sample     " + ele));
//等价于
Flowable.intervalRange(0, 10 , 0 , 1 , TimeUnit.SECONDS)
   .throttleLast(2, TimeUnit.SECONDS)//每 2 秒中采集最后一个元素
   .subscribe(ele -> Log.i("hoyouly", "throttleLast     " + ele));

//结果
03-30 22:37:12.888  9179  9224 I hoyouly : sample     1
03-30 22:37:14.887  9179  9224 I hoyouly : sample     3
03-30 22:37:16.888  9179  9224 I hoyouly : sample     5
03-30 22:37:18.888  9179  9224 I hoyouly : sample     7

只输出了 1 3 5 7 ,

throttleWithTimeout() / debounce()

喜新厌旧
如果有新数据,则抛弃旧数据,可以用在即时搜索需求中

Log.d("hoyouly", "debounce begin ");
Flowable.intervalRange(0, 10 , 0 , 1 , TimeUnit.SECONDS)
    .debounce(2, TimeUnit.SECONDS)//2秒内有新数据则抛弃旧数据
    .subscribe(
        ele -> Log.i("hoyouly", "debounce     " + ele),
        throwable -> Log.d("hoyouly", "debounce : onError() "),
        () -> Log.d("hoyouly", "debounce : onComplete() "));
//结果
03-30 22:43:54.550  9854  9854 D hoyouly : debounce begin
03-30 22:44:03.566  9854  9901 I hoyouly : debounce     9
03-30 22:44:03.567  9854  9901 D hoyouly : debounce : onComplete()

中间间隔了 9 秒钟,才输出了 9 ,而且只输出了 9 ,然后就 onComplete() , 这个就是因为每间隔 1 秒就会有新数据,而 debounce() 设置的是 2 秒,所以会等到最后一个 9 输出后,等待两秒后发现没数据了,然后把最后一个 9 发给下游。然后事件就结束了

合并聚合

startWith() / startWithArray()

在发送元素之前追加数据或者追加新的被观察者。

Flowable.just(4, 5 , 6)
            .startWith(0)
            .startWith(Flowable.just(1, 2 , 3))
            .startWithArray(7, 8)
            .subscribe(
                ele -> Log.i("hoyouly", "startWith     " + ele),
                throwable -> Log.d("hoyouly", "startWith : onError() "),
                () -> Log.d("hoyouly", "startWith : onComplete() "));
//结果
hoyouly : startWith     7
hoyouly : startWith     8
hoyouly : startWith     1
hoyouly : startWith     2
hoyouly : startWith     3
hoyouly : startWith     0
hoyouly : startWith     4
hoyouly : startWith     5
hoyouly : startWith     6
hoyouly : startWith : onComplete()

startWith() 和 感觉有点像是 栈,先进后出,

startWithArray() 追加多个元素。

这两个都是使用的 concat() 和 concatArray()

concat() / concatArray()

concat 合并多个数组。合并多个字符

concat 操作符可以连接最多 4 个的被观察者,他们的顺序是串行执行的:

如果需要多于 4 个被观察合并在一起,可以使用 concatArray 操作符:

Flowable.concat(Flowable.just(1, 2), Flowable.just(5, 6), Flowable.just(3, 4))
           .subscribe(
               ele -> Log.i("hoyouly", "concat     " + ele),
               throwable -> Log.d("hoyouly", "concat : onError() "),
               () -> Log.d("hoyouly", "concat : onComplete() ")
           );
//结果
hoyouly : concat     1
hoyouly : concat     2
hoyouly : concat     5
hoyouly : concat     6
hoyouly : concat     3
hoyouly : concat     4
hoyouly : concat : onComplete()

merge() / mergeArray()

按照时间线并行执行

Flowable.merge(
            Flowable.intervalRange(0, 5 , 1 , 1 , TimeUnit.SECONDS),
            Flowable.intervalRange(6, 3 , 1 , 1 , TimeUnit.SECONDS))
            .subscribe(ele -> Log.i("hoyouly", "merge     " + ele),
                throwable -> Log.d("hoyouly", "merge : onError() "),
                () -> Log.d("hoyouly", "merge : onComplete() ")
            );
//结果
03-30 23:20:03.139 12508 12552 I hoyouly : merge     0
03-30 23:20:03.139 12508 12553 I hoyouly : merge     6
03-30 23:20:04.140 12508 12552 I hoyouly : merge     1
03-30 23:20:04.140 12508 12552 I hoyouly : merge     7
03-30 23:20:05.139 12508 12553 I hoyouly : merge     8
03-30 23:20:05.139 12508 12553 I hoyouly : merge     2
03-30 23:20:06.139 12508 12552 I hoyouly : merge     3
03-30 23:20:07.138 12508 12552 I hoyouly : merge     4
03-30 23:20:07.138 12508 12552 D hoyouly : merge : onComplete()            
Flowable.concat(
            Flowable.intervalRange(0, 5 , 1 , 1 , TimeUnit.SECONDS),
            Flowable.intervalRange(6, 3 , 1 , 1 , TimeUnit.SECONDS))
            .subscribe(ele -> Log.i("hoyouly", "concat     " + ele),
                throwable -> Log.d("hoyouly", "concat : onError() "),
                () -> Log.d("hoyouly", "concat : onComplete() ")
            );
//结果
03-30 23:21:14.817 12711 12758 I hoyouly : concat     0
03-30 23:21:15.817 12711 12758 I hoyouly : concat     1
03-30 23:21:16.817 12711 12758 I hoyouly : concat     2
03-30 23:21:17.817 12711 12758 I hoyouly : concat     3
03-30 23:21:18.817 12711 12758 I hoyouly : concat     4
03-30 23:21:19.820 12711 12783 I hoyouly : concat     6
03-30 23:21:20.819 12711 12783 I hoyouly : concat     7
03-30 23:21:21.819 12711 12783 I hoyouly : concat     8
03-30 23:21:21.819 12711 12783 D hoyouly : concat : onComplete()

这样就能看出区别了, concat() 是串行执行的,执行完一个执行另外一个, merge() 是并行执行的,两个同时开工

zip()

压缩操作符,多个被观察者压缩成一个,数量不同的,以少的为基准,木桶效应

Flowable.zip(
           Flowable.intervalRange(0, 5 , 1 , 1 , TimeUnit.SECONDS),
           Flowable.intervalRange(6, 3 , 1 , 2 , TimeUnit.SECONDS),(aLong, aLong2) -> {
               Log.d("hoyouly", "aLong : " + aLong + "  aLong2: " + aLong2);
               return aLong + aLong2;
           })
           .subscribe(ele -> Log.i("hoyouly", "zip     " + ele),
               throwable -> Log.d("hoyouly", "zip : onError() "),
               () -> Log.d("hoyouly", "zip : onComplete() ")
           );
//结果
03-30 23:27:09.126 13412 13455 D hoyouly : aLong : 0  aLong2: 6
03-30 23:27:09.126 13412 13455 I hoyouly : zip     6
03-30 23:27:11.125 13412 13455 D hoyouly : aLong : 1  aLong2: 7
03-30 23:27:11.125 13412 13455 I hoyouly : zip     8
03-30 23:27:13.126 13412 13455 D hoyouly : aLong : 2  aLong2: 8
03-30 23:27:13.126 13412 13455 I hoyouly : zip     10
03-30 23:27:13.126 13412 13455 D hoyouly : zip : onComplete()            

虽然第一个被观察者 5 秒钟就发送完数据了,而第二个观察者需要 6 秒钟,但是还是以第二个观察者为准,没间隔 2 秒打印向下发送一条数据,并且只发送三条数据

combineLatest()

在同一个时间线上,合并最后的元素,

Flowable.combineLatest(
            Flowable.intervalRange(0, 5 , 1 , 800 , TimeUnit.MILLISECONDS),
            Flowable.intervalRange(6, 3 , 1 , 2 , TimeUnit.SECONDS),(aLong, aLong2) -> {
                Log.d("hoyouly", "aLong : " + aLong + "  aLong2: " + aLong2);
                return aLong + aLong2;
            })
            .subscribe(ele -> Log.i("hoyouly", "combineLatest     " + ele),
                throwable -> Log.d("hoyouly", "combineLatest : onError() "),
                () -> Log.d("hoyouly", "combineLatest : onComplete() ")
            );
//结果
//刚开始的时候,两个被观察者分别是 1 和 6 所以值是7
03-30 23:32:55.921 13878 13911 D hoyouly : aLong : 1  aLong2: 6
03-30 23:32:55.921 13878 13911 I hoyouly : combineLatest     7
//600ms后,第一个观察者变成 2 ,第二个还是 6 所以值是 8
03-30 23:32:56.521 13878 13910 D hoyouly : aLong : 2  aLong2: 6
03-30 23:32:56.521 13878 13910 I hoyouly : combineLatest     8
//再过 800ms 后,第一个观察者变成 3 ,第二个还是 6 所以值是 9
03-30 23:32:57.322 13878 13910 D hoyouly : aLong : 3  aLong2: 6
03-30 23:32:57.322 13878 13910 I hoyouly : combineLatest     9
//再过 600ms 后,第一个观察者变成 3 ,第二个还是 7 所以值是 10
03-30 23:32:57.922 13878 13911 D hoyouly : aLong : 3  aLong2: 7
03-30 23:32:57.922 13878 13911 I hoyouly : combineLatest     10
//再过 200ms 后,第一个观察者变成 4 ,第二个还是 7 所以值是 11
03-30 23:32:58.121 13878 13910 D hoyouly : aLong : 4  aLong2: 7
03-30 23:32:58.121 13878 13910 I hoyouly : combineLatest     11
//再过 800ms 后,第一个观察者变成 4 ,第二个是 8 所以值是 12
03-30 23:32:59.924 13878 13911 D hoyouly : aLong : 4  aLong2: 8
03-30 23:32:59.924 13878 13911 I hoyouly : combineLatest     12

03-30 23:32:59.924 13878 13911 D hoyouly : combineLatest : onComplete()

其实就是只要有一个被观察者发生改变,就取出来所有被观察者,然后进行合并,有点像是牵一发而动全身的感觉。

reduce()

可以把被观察者所有的元素聚合成一个单一的元素

Flowable.intervalRange(3, 3 , 1 , 2 , TimeUnit.SECONDS)
            .reduce((last, item) -> {
                Log.d("hoyouly", "last :   " + last + "  item   " + item);
                return last * item;
            }).subscribe(ele -> Log.i("hoyouly", "reduce     " + ele),
            throwable -> Log.d("hoyouly", "reduce : onError() "),
            () -> Log.d("hoyouly", "reduce : onComplete() ")
        );
//结果
03-30 23:45:09.743 14486 14532 D hoyouly : last :   3  item   4
03-30 23:45:11.741 14486 14532 D hoyouly : last :   12  item   5
03-30 23:45:11.741 14486 14532 I hoyouly : reduce     60        

3*4*5 的结果就是 60

count()

统计一个被观察者发送多少个元素可

Flowable.intervalRange(3, 8 , 1 , 2 , TimeUnit.SECONDS)
           .count()
           .subscribe(aLong -> Log.d("hoyouly", "count : onNext "+aLong));
//结果
03-30 23:49:46.727 15061 15095 D hoyouly : count : onNext 8

一共发送了 8 次

条件操作符

all

判断所有元素是否满足某个条件

Flowable.just(4, 5 , 6)
            .all(integer -> integer > 3).subscribe(aBoolean -> Log.d("hoyouly", "all : onNext " + aBoolean),
            throwable -> Log.d("hoyouly", "all : onError()"));
//结果  因为 三个元素都大于 3
03-31 21:36:17.538  8603  8603 D hoyouly : all : onNext true

Flowable.just(4, 5 , 6)
           .all(integer -> integer > 5).subscribe(aBoolean -> Log.d("hoyouly", "all : onNext " + aBoolean),
           throwable -> Log.d("hoyouly", "all : onError()"));
//结果 因为并不是所有的元素都大于5
03-31 21:35:32.686  8463  8463 D hoyouly : all : onNext false

all() 最终返回的是一个Single,所以只有 onNext() 和 onError()

contains()

被观察者中是否包含某个元素

Flowable.just(4, 5 , 6)
           .contains(5).subscribe(aBoolean -> Log.d("hoyouly", "contains : onNext " + aBoolean));
//结果
03-31 21:41:20.856  8935  8935 D hoyouly : contains : onNext true           

any()

contains() 内部也是调用的 any() ,判断是否存在某一个元素满足一定的条件

Flowable.just(4, 5 , 6)
            .any(integer -> integer > 5).subscribe(
            aBoolean -> Log.d("hoyouly", "any : onNext " + aBoolean));
//结果
03-31 21:45:36.995  9346  9346 D hoyouly : any : onNext true

Flowable.just(4, 5 , 6)
            .any(integer -> integer > 7).subscribe(
            aBoolean -> Log.d("hoyouly", "any : onNext " + aBoolean));  
//结果
03-31 21:45:36.996  9346  9346 D hoyouly : any : onNext false                      

变换操作符

Map()

可以把一个元素转换成新的元素发射出去

Flowable.just(4, 5 , 6)
            .map(integer -> "map_" + integer)
            .subscribe(
                aBoolean -> Log.d("hoyouly", "map onNext() " + aBoolean),
                throwable -> Log.d("hoyouly", "map : onError() "),
                () -> Log.d("hoyouly", "map : onComplete() "));
//结果是每个元素前面添加了 map_ 前缀
03-31 21:51:11.584  9789  9789 D hoyouly : map onNext() map_4
03-31 21:51:11.585  9789  9789 D hoyouly : map onNext() map_5
03-31 21:51:11.593  9789  9789 D hoyouly : map onNext() map_6
03-31 21:51:11.593  9789  9789 D hoyouly : map : onComplete()                 

flatMap()

把每一个元素转换成一个新的被观察者,每个被观察者发射的数据将会合并成新的观察者,这些元素顺序输出

Flowable.just(4, 5 , 6)
            .flatMap((Function<Integer, Publisher<?>>) i -> Flowable.just(i, i * i))
            .subscribe(
                aBoolean -> Log.d("hoyouly", "flatMap onNext() " + aBoolean),
                throwable -> Log.d("hoyouly", "flatMap : onError() "),
                () -> Log.d("hoyouly", "flatMap : onComplete() "));
//一个元素 转换成一个新的观察者,新的观察者有两个元素,一个是之前的元素,另一个是之前元素的平方。所以结果如下。
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 4
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 16
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 5
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 25
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 6
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap onNext() 36
03-31 21:54:25.286 10041 10041 D hoyouly : flatMap : onComplete()

内部使用的 merge() 合并元素

concatMap()

功能和 flatMap() 类似,只不过这个内部使用的是 concat() ,严格按照顺序执行的,而 flatMap() 可能出现元素交叉的情况。

Flowable.just(4, 5 , 6)
            .concatMap((Function<Integer, Publisher<?>>) i -> Flowable.just(i, i * i))
            .subscribe(
                aBoolean -> Log.d("hoyouly", "concatMap onNext() " + aBoolean),
                throwable -> Log.d("hoyouly", "concatMap : onError() "),
                () -> Log.d("hoyouly", "concatMap : onComplete() "));
//结果
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 4
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 16
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 5
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 25
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 6
03-31 21:58:28.187 10309 10309 D hoyouly : concatMap onNext() 36

cast()

强制转换每一个元素类型,

Flowable.just(4, "5", 6)
           .cast(Integer.class)
           .subscribe(
               aBoolean -> Log.d("hoyouly", "cast onNext() " + aBoolean),
               throwable -> Log.d("hoyouly", "cast : onError() "),
               () -> Log.d("hoyouly", "cast : onComplete() "));
//结果
03-31 22:07:12.269 11112 11112 D hoyouly : cast onNext() 4
03-31 22:07:12.269 11112 11112 D hoyouly : cast : onError()

内部调用 map 操作符,字符串 5 不是 Integer 。所以执行到了 onError() ,后面的元素 6 就不在执行了

buffer()

把多个元素打包成一个元素一次发射数据

Flowable.just(4, "5", 6 , "ss", 7)
           .buffer(3)//三个元素打包成一个元素发射
           .subscribe(
               aBoolean -> Log.d("hoyouly", "buffer onNext() " + aBoolean),
               throwable -> Log.d("hoyouly", "buffer : onError() "),
               () -> Log.d("hoyouly", "buffer : onComplete() "));
//结果
03-31 22:11:30.795 11407 11407 D hoyouly : buffer onNext() [4, 5 , 6]
03-31 22:11:30.795 11407 11407 D hoyouly : buffer onNext() [ss, 7]
03-31 22:11:30.795 11407 11407 D hoyouly : buffer : onComplete()

toList()

把所有元素转换成一个 List 一次过发送出去

Flowable.just(4, "5", 6 , "ss", 7)
            .toList()
            .subscribe(
                aBoolean -> Log.d("hoyouly", "toList onNext() " + aBoolean),
                throwable -> Log.d("hoyouly", "toList : onError() "));
//结果
03-31 22:14:34.302 11629 11629 D hoyouly : toList onNext() [4, 5 , 6 , ss , 7]

搬运地址:

关于 RxJava 最友好的文章

关于 RxJava 最友好的文章——背压(Backpressure)

关于 RxJava 最友好的文章—— RxJava 2.0 全新来袭

RxJava 2.x 使用详解系列

给初学者的RxJava 2.0教程系列