FlatMap

简单来说就是把 被观察者的每次发射出来的事件,转换成一个子被观察者,然后通过合并(Merge)所有子被观察者的事件成总的一系列事件发送给观察者。 相信很多人理解 flatmap 都是 根据 扔物线大神文章中的学生和课程的例子,我也是以这个例子分析源码的。

List<Student> students = new ArrayList<Student>();
Observable.fromIterable(students) //    此时  得到的对象  ObservableFromIterable
          .flatMap(new Function<Student, ObservableSource<Course>>() {
              @Override
              public ObservableSource<Course> apply(Student student) throws Exception {
                  return Observable.fromIterable(student.coursesList);
              }
          }) //  此时  得到的对象 ObservableFlatMap 也就是说相当于 调用了 ObservableFlatMap 的 subscribe() ,最终会执行到 subscribeActual
          .subscribe(new Consumer<Course>() {//Consumer 会被组装成一个  LambdaObserver
              @Override
              public void accept(Course course) throws Exception {

              }
          });

闲话少说,一行行看代码咯。

  1. Observable.fromIterable(students)
    public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
         return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source));
     }
    

    source 是集合 students ,一个 ArrayList , 这也就是创建了一个 ObservableFromIterable

  2. .flatMap(new Function<Student, ObservableSource>() {} 同理 flatMap() 会返回一个 ObservableFlatMap 对象, this 就是 之前 ObservableFromIterable 对象, mapper 就是创建的 Function 对象。
    RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper , delayErrors , maxConcurrency , bufferSize))
    
  3. subscribe(new Consumer() {} 虽然这是 Observable 中的方法,但是我们需要记住,是 ObservableFlatMap 对象调用的,这很关键。
    public final Disposable subscribe(Consumer<? super T> onNext) {
     return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
     Action onComplete , Consumer<? super Disposable> onSubscribe) {
     LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError , onComplete , onSubscribe);
     subscribe(ls);
     return ls;
    }
    

在 subscribe() 中做了两件事

  1. 把 Consumer 会被封装成一个 LambdaObserver
  2. 执行到 subscribeActual(observer); 因为 subscribeActual() 是抽象方法,所以需要看它的实现类,而 ObservableFlatMap extends Observable , 很明显,看着就像,这和中国起名有关,老子姓啥,儿子一般也姓啥 ,老子是 Observable ,具有 flatmap 功能的儿子就叫 ObservableFlatmap ,所以就执行到了 ObservableFlatMap 中的subscribeActual()
@Override
public void subscribeActual(Observer<? super U> t) {      
      source.subscribe(new MergeObserver<T, U>(t, mapper , delayErrors , maxConcurrency , bufferSize));
}

主要做了两件事

  1. 创建一个 MergeObserver 对象, t 是封装好的 LambdaObserver 对象, mapper 是我们 flatMap 中 new 的 Function 对象, MergeObserver 中保留了下游的 LambdaObserver 和 上游的 Function 对象。
    MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
     boolean delayErrors , int maxConcurrency , int bufferSize) {
     this.downstream = actual;
     this.mapper = mapper;
     this.delayErrors = delayErrors;
     this.maxConcurrency = maxConcurrency;
     this.bufferSize = bufferSize;
     if (maxConcurrency != Integer.MAX_VALUE) {
         sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
     }
     // 创建一个原子性的内部观察者对象数组
     this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
    }
    
  2. 执行source.subscribe()方法, source 就是上游对象,即 ObservableFromIterable , ObservableFromIterable extends Observable ,最终也是调用 ObservableFromIterable 的 subscribeActual() ,所以直接看 ObservableFromIterable 中的 subscribeActual() ,而传递过来的 Observer 则是下游的 MergeObserver 对象
    public void subscribeActual(Observer<? super T> observer) { //observer 是 下游的 MergeObserver 对象
     Iterator<? extends T> it;
     try {
        //source  就是我们创建 List 集合 Students
         it = source.iterator();
     } catch (Throwable e) {
         Exceptions.throwIfFatal(e);
         EmptyDisposable.error(e, observer);
         return;
     }
     boolean hasNext;
     try {
       // 集合是否为null
         hasNext = it.hasNext();
     } catch (Throwable e) {
         Exceptions.throwIfFatal(e);
         EmptyDisposable.error(e, observer);
         return;
     }
     if (!hasNext) {
         EmptyDisposable.complete(observer);
         return;
     }
     //observer 是 下游的 MergeObserver 对象
     FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
     // 执行 MergeObserver 中的 onSubscribe ,如果创建的是一个 Observer 对象,最终会执行到 创建的 Observer 中的 onSubscrbe() 中
     observer.onSubscribe(d);
    
     if (!d.fusionMode) {
         d.run();
     }
    }
    

    observer 就是下游的 MergeObserver 对象,其中代理了 LambdaObserver , LambdaObserver 中代理 Consumer 对象,有点像是俄罗斯套娃
    创建 FromIterableDisposable 的时候会把该 observer 传递过去,也就是 downstream

    FromIterableDisposable(Observer<? super T> actual, Iterator<? extends T> it) {
     this.downstream = actual;
     this.it = it;
    }
    

主要看 run() 方法

void run() {
    boolean hasNext;
    do {
        // 如果切断了被观察者就接收不到后续的事件了
        if (isDisposed()) {
            return;
        }
        T v;
        try {
            v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");  
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            downstream.onError(e);
            return;
        }
        //v  就是一个 Student 对象, downstream 就是 MergeObserver 对象
        downstream.onNext(v);

        if (isDisposed()) {
            return;
        }
        try {
            hasNext = it.hasNext();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            downstream.onError(e);
            return;
        }
    } while (hasNext);

    if (!isDisposed()) {
        downstream.onComplete();
    }
}

发现两点

  1. do while 循环,这样就能遍历整个 Iterator 对象
  2. 循环中执行了 downstream.onNext(v), 这个 downstream 就是 MergeObserver 对象,然后就又回到了 MergeObserver 中
  3. 循环结束后,执行了 downstream.onComplete()。

接下来主要看 MergeObserver 中的onNext()

@Override
public void onNext(T t) {  //t 就是一个 Student 对象
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
      // mapper.apply(t) 就会执行 我们创建 FlatMap 的时候的那个 Function ,然后返回一个 ObservableSource
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        upstream.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) {
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }
    subscribeInner(p);
}

因为 apply() 返回的是 Observable.fromIterable(student.coursesList),所以 p 就是一个 ObservableFromIterable 对象 注意,此时 T 类型是 Student ,但是 <? extends U> 其实就是 Course 虽然代码很多,其实主要的就是根据 apply() 得到了一个 Observable ,然后执行了 subscribeInner() 方法。

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
      //p 就是一个 ObservableFromIterable 对象 而 ObservableFromIterable extends  Observable  implements ObservableSource ,
        if (p instanceof Callable) {
            if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                boolean empty = false;
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        empty = true;
                    }
                }
                if (empty) {
                    drain();
                    break;
                }
            } else {
                break;
            }
        } else {
          // 最终执行到了 这里, this 也就是 MergeObserver 对象
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);//p 是一个 ObservableFromIterable 对象
            }
            break;
        }
    }
}

InnerObserver 又封装了 MergeObserver ,先看看addInner()

// 这里主要做内部观察者对象数组的增加
// 通过创建 size 为原数组长度+1的新数组并作为新的内部观察者对象数组来实现
boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        // 获取之前 MergeObserver 创建的内部观察者对象数组
        InnerObserver<?, ?>[] a = observers.get();
        if (a == CANCELLED) {
            inner.dispose();
            return false;
        }
        int n = a.length; // 0
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0 , b , 0 , n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}

然后就是 p.subscribe(inner);p 是 ObservableFromIterable ,所以最后又执行到 ObservableFromIterable 中的 subscribeActual() 中了 ,只不过这个时候, Observer 变成了 InnerObserver 。 source 变成了 student.coursesList ,而不是 student , subscribeActual() 上面已经讲过了,最终会执行 InnerObserver 中的 onNext() 方法

@Override
  public void onNext(U t) {
     if (fusionMode == QueueDisposable.NONE) {
         parent.tryEmit(t, this);
     } else {
         parent.drain();
     }
  }

感觉会执行到 parent.drain() ,而 parent 就是 代理的 MergeObserver 对象,

void drain() {
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}

void drainLoop() {
    ...
    final Observer<? super U> child = this.downstream;
    child.onNext(o);
    ...
}

child 就是 LambdaObserver 对象,最终也就执行到了 Consumer 中的 accept() 中了, 因为 FromIterableDisposable 中有个循环,所以通过 flatmap 得到的 Observable 对象,就会再次遍历,直到把所有的结束为止。 这样就把一个 Observable 转换成多个 Observable ,然后将他们发射的数据合并后放入一个单独的 Observable 中, flatmap 操作符将使用一个指定的函数 apple() 对原始的 Observable 发射出来的每一项进行变化, apply() 返回一个本身也是发射数据的 Observable ,然后合并这些 Observable 发射的数据,最后将合并的结果当做它自己的序列发射出去。

flatMap 和 ConcatMap 区别

flatMap 事件有可能事件无序, Concatmap 可以做到事件有序
如果 flatmap 放到一个线程里面执行,也是可以做到事件有序的


搬运地址:

给 Android 开发者的 RxJava 详解

RxJava 2.x 源码分析(二) 之 FlatMap

详解 RxJava2 的线程切换原理