当前位置: 代码迷 >> 综合 >> RxJava3源码实现-4-flatMap()
  详细解决方案

RxJava3源码实现-4-flatMap()

热度:55   发布时间:2023-10-09 04:46:05.0

目录

1、介绍

1.1、适用场景

1.2、代码实现

2、手写实现

2.1、flatMap()调用

2.2、flatMap()触发

3、代码


1、介绍

FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。

注意1:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。

注意2:如果任何一个通过这个flatMap操作产生的单独的Observable调用onError异常终止了,这个Observable自身会立即调用onError并终止。

RxJava3源码实现-4-flatMap()

1.1、适用场景

  1. 对数据流中的每个元素都要执行长时间运行的异步操作,这时候你可以适用flatMap为每个元素单独创建一个Observable,并利用suscribeOn()放入到单独的线程之中。
  2. 需要一对多进行转换,一个事件扩展为多个子事件。例如,客户组成的流要被转换为每个客户的订单组成的流,每个客户的订单可能是任意数量的。

1.2、代码实现

这里需要提一个疑问:既然我们flatMap()中返回的类型是Observable<String>类型的,那么我们最后接受到的类型应该是:        Observable<Observable<String>>,但我们红色框中的类型却不是,这就是flatMap操作符的重点之处。带着这个疑问,我们可以可以去看源码,看它如何合并每个Observable的。

RxJava3源码实现-4-flatMap()

输出结果:

如果在异步情况下, 我们接受数据的顺序和发送数据的顺序是不一致的。

如果你想确保输出和输入顺序一致的话,可以使用concatMap()操作符。

RxJava3源码实现-4-flatMap()

2、手写实现

代码目录如下:

create包下面是我们RxJava3源码实现-1-create()+subscribe()实现的内容,这里就不做介绍了。

我们的flatMap()操作符只在create的基础上添加了一个类:YoObservableFlatMap.

如果你想要看所有源码,可以看我的github地址。

RxJava3源码实现-4-flatMap()

2.1、flatMap()调用

看下面的代码:

其实当你单独调用flatMap()操作符的时候,并没做实际的逻辑操作,只是将我们new Function()函数对象传递给了YoObservableFlatMap类中。

RxJava3源码实现-4-flatMap()

2.2、flatMap()触发

看下图代码:

1、当我们在订阅的时候会触发YoObservableFlatMap类中的subscribe()函数,这个函数会触发上游create()中的subscribe()函数,然后触发onNext()事件。

2、这时候会触发我们MergeObserver中的onNext()函数,这里边会将我们上游的每个String类型的值,转换成一个新的Observable对象,下图绿色框中的代码。

3、就是将我们新创建出来的每个Observable合并,并将Observable类型转换成输出的String类型值,很显然这部分代码发生在下图红色框中的suscribeInner()中。继续往下看----

RxJava3源码实现-4-flatMap()

下图是我们subscribeInner()的实现:

4、他们在里边为每个Observable又添加了一层订阅,这样就可以通过订阅的形式,将返回值从Observable<String>转换成了String类型。

RxJava3源码实现-4-flatMap()

3、代码

YoObservable:


public abstract class YoObservable<T> implements YoObservableSource<T> {@Overridepublic void subscribe(YoObserver<? super T> yoObserver) {subscribeActual(yoObserver);}public static YoObservable create(YoObservableOnSubscribe source) {return new YoObservableCreate(source);}public final <R> YoObservable<R> map(YoFunction<? super T, ? extends R> mapper) {return new YoObservableMap<>(this, mapper);}public final YoObservable<T> subscribeOn(Executor scheduler) {return new YoObservableSubscribeOn<>(this, scheduler);}public final YoObservable<T> observeOn(Executor scheduler) {return new YoObservableObserveOn<>(this, scheduler);}public final <R> YoObservable<R> flatMap(YoFunction<? super T, ? extends YoObservableSource<? extends R>> mapper) {return new YoObservableFlatMap<>(this, mapper);}protected abstract void subscribeActual(YoObserver<? super T> observer);
}

YoObservableFlatMap:


public class YoObservableFlatMap<T, U> extends YoObservable<U> {private final YoFunction<? super T, ? extends YoObservableSource<? extends U>> mapper;private final YoObservableSource<T> source;public YoObservableFlatMap(YoObservableSource<T> source,YoFunction<? super T, ? extends YoObservableSource<? extends U>> mapper) {this.mapper = mapper;this.source = source;}@Overrideprotected void subscribeActual(YoObserver<? super U> observer) {source.subscribe(new MergeObserver<>(observer, mapper));}static final class MergeObserver<T, U> implements YoObserver<T> {final YoObserver<? super U> downstream;final YoFunction<? super T, ? extends YoObservableSource<? extends U>> mapper;LinkedQueue<U> queue = new LinkedQueue<>();volatile boolean done;volatile boolean disposed;int wip;long uniqueId;MergeObserver(YoObserver<? super U> actual,YoFunction<? super T, ? extends YoObservableSource<? extends U>> mapper) {this.downstream = actual;this.mapper = mapper;}@Overridepublic void onSubscribe() {downstream.onSubscribe();}@Overridepublic void onNext(T t) {// safeguard against misbehaving sourcesif (done) {return;}YoObservableSource<? extends U> p;try {p = (YoObservableSource<? extends U>) mapper.apply(t);} catch (Throwable e) {onError(e);return;}subscribeInner(p);}void subscribeInner(YoObservableSource<? extends U> p) {InnerObserver<T, U> inner = new InnerObserver<>(this, uniqueId++);p.subscribe(inner);}@Overridepublic void onError(Throwable t) {if (done) {return;}done = true;downstream.onError(t);}@Overridepublic void onComplete() {if (done) {return;}done = true;downstream.onComplete();}void drain(U t) {downstream.onNext(t);}}static final class InnerObserver<T, U> implements YoObserver<U> {final long id;final MergeObserver<T, U> parent;volatile boolean done;InnerObserver(MergeObserver<T, U> parent, long id) {this.id = id;this.parent = parent;}@Overridepublic void onSubscribe() {}@Overridepublic void onNext(U t) {parent.drain(t);}@Overridepublic void onError(Throwable t) {done = true;parent.onError(t);}@Overridepublic void onComplete() {done = true;parent.onComplete();}}}