RxJava 版本:2.2.5
map操作符的实现
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(String string) {
Log.e(TAG, "onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
先上个图
Observable的create()方法简化版
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<T>(source);
}
create()方法返回的Observable是一个ObservableCreate对象。
Observable的map()方法简化版
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return new ObservableMap<T, R>(this, mapper);
}
Observable的subscribe()方法简化版
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
我们进入到ObservableMap类去看
public void subscribeActual(Observer<? super U> t) {
//1.
source.subscribe(new MapObserver<T, U>(t, function));
}
这里的source就是create()方法返回的ObservableCreate对象。然后使用传入的下游的观察者和function构建了一个MapObserver对象。
然后source调用subscribe()方法。内部也是调用subscribeActual()方法
protected void subscribeActual(Observer<? super T> observer) {
//创建CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//注释1处
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
注释1处,这里的source,就是创建ObservableCreate对象的时候传入的ObservableOnSubscribe对象。
Observable.create(new ObservableOnSubscribe<Integer>() {
//发射数据
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
})
这个时候CreateEmitter开始发射数据,调用3次onNext方法,然后调用
onComplete方法。在CreateEmitter对象内部,会真正调用传入的observer(MapObserver类型)对应的onNext(T t)方法和onComplete()方法。
MapObserver类简化版
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//调用super方法为downstream赋值
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
U v;
//应用mapper以后,返回结果
v = mapper.apply(t)
downstream.onNext(v);
}
}
MapObserver,在onNext() 方法中,获取上游发射的数据t,应用传入的mapper的apply方法,转换成期望类型的对象v,调用下游观察者(在这个例子中就是我们手写的Observer)的onNext()方法。
总结:map操作符就是将上游发射的每个数据,应用一个mapper,转化成期望的数据类型,然后再发射出去。
flatMap操作符的实现
Observable.create(new ObservableOnSubscribe<List<Integer>>() {
@Override
public void subscribe(ObservableEmitter<List<Integer>> emitter) throws Exception {
List<Integer> list1 = new ArrayList<>();
list1.add(1);
list1.add(2);
list1.add(3);
List<Integer> list2 = new ArrayList<>();
list2.add(4);
list2.add(5);
list2.add(6);
emitter.onNext(list1);
emitter.onNext(list2);
emitter.onComplete();
}
}).flatMap(new Function<List<Integer>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(List<Integer> integers) throws Exception {
//注释1处,返回的是一个ObservableFromIterable对象
return Observable.fromIterable(integers);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
上张图
create()方法返回的Observable是一个ObservableCreate对象。
然后我们在调用flatMap方法的时候,传入了一个mapper对象。该对象的apply方法返回的是一个ObservableFromIterable对象。
Observable的fromIterable方法精简版
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
return new ObservableFromIterable<T>(source);
}
Observable的flatMap()方法
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
//注意,传入的maxConcurrency参数是Integer.MAX_VALUE
return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
方法重载也是狠,最终是调用了这个方法,返回一个ObservableFlatMap对象。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
//...
return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}
当调用subscribe方法把观察者和被观察者关联起来的时候会调用ObservableFlatMap的subscribeActual()方法。
subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
ObservableFlatMap的subscribeActual()方法简化版
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
首先创建了一个MergeObserver对象,MergeObserverObservableFlatMap类的静态内部类。我们看一下MergeObserver的构造函数。
MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
this.downstream = actual;//我们手写的observer
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;//我们传入的是Integer.MAX_VALUE
this.bufferSize = bufferSize;
if (maxConcurrency != Integer.MAX_VALUE) {
sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
}
//初始化observers
this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}
然后source(ObservableCreate对象)调用subscribe()方法。最终会进入到ObservableCreate的subscribeActual()方法。
ObservableCreate的subscribeActual()方法简化版
protected void subscribeActual(Observer<? super T> observer) {
//1. 创建CreateEmitter对象,传入的observer参数是MergeObserver对象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2. 调用观察者的onSubscribe()方法
observer.onSubscribe(parent);
//3. 发射数据
source.subscribe(parent);
}
在注释1处,首先创建了一个CreateEmitter对象。CreateEmitter类是ObservableCreate的静态内部类。
在注释2处,调用了观察者MergeObserver的onSubscribe方法
MergeObserver的onSubscribe()方法
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
//为upstream赋值,并调用下游的onSubscribe()方法
this.upstream = d;
//调用下游,就是我们手写的observer的onSubscribe方法
downstream.onSubscribe(this);
}
}
在注释3处,发射数据,source就Observable.create()方法传入的匿名的ObservableOnSubscribe的对象。
Observable.create(new ObservableOnSubscribe<List<Integer>>() {
@Override
public void subscribe(ObservableEmitter<List<Integer>> emitter) throws Exception {
List<Integer> list1 = new ArrayList<>();
list1.add(1);
list1.add(2);
list1.add(3);
List<Integer> list2 = new ArrayList<>();
list2.add(4);
list2.add(5);
list2.add(6);
//CreateEmitter调用onNext
emitter.onNext(list1);
//CreateEmitter调用onNext
emitter.onNext(list2);
//CreateEmitter调用onNext
emitter.onComplete();
}
})
在发射数据过程中,CreateEmitter会调用两次会onNext()和一次onComplete()。
CreateEmitter的onNext方法和onComplete方法
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
CreateEmitter的onNext方法和onComplete方法内部会调用observer(MergeObserver类型对象)对应的onNext方法和onComplete方法。
现在我们要进入MergeObserver进行分析了,进入正题。
MergeObserver的onNext方法简化版
MergeObserver第1次调用onNext方法。注意注意,这是MergeObserver第1次调用onNext方法。
public void onNext(T t) {
if (done) {
return;
}
ObservableSource<? extends U> p;
//应用传入的mapper,在这个例子中,mapper.apply返回的是一个ObservableFromIterable对象
p = mapper.apply(t)
subscribeInner(p);
}
MergeObserver的subscribeInner方法简化版
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
//注释1处,创建一个InnerObserver对象
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
//注释2处,添加inner
if (addInner(inner)) {
//注释3处,如果添加成功,p就订阅inner对象。
p.subscribe(inner);
}
//跳出循环
break;
}
}
在注释1处,创建一个InnerObserver对象,InnerObserver类是ObservableFlatMap的一个静态内部类,注意我们构建的第一个InnerObserver对象传入的uniqueId是0。
InnerObserver的构造函数
InnerObserver(MergeObserver<T, U> parent, long id) {
this.id = id;
this.parent = parent;
}
在注释2处,添加inner,内部操作就是把创建好的InnerObserver对象添加到observers中去。
final AtomicReference<InnerObserver<?, ?>[]> observers;
boolean addInner(InnerObserver<T, U> inner) {
for (;;) {
InnerObserver<?, ?>[] a = observers.get();
if (a == CANCELLED) {
inner.dispose();
return false;
}
int n = a.length;
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
在注释3处,如果添加成功,p就订阅inner对象。正常情况下添加是成功的,这里的p就是我们应用flatMap的mapper后返回的对象。在上面的例子中就是Observable.fromIterable()
返回的对象。
Observable的fromIterable()方法简化版
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
return new ObservableFromIterable<T>(source);
}
返回一个ObservableFromIterable对象。
ObservableFromIterable的subscribe方法内部会调用subscribeActual方法。
subscribeActual方法精简版
public void subscribeActual(Observer<? super T> observer) {
Iterator<? extends T> it;
//注释1处,这里的source就是List<Integer> 对象
it = source.iterator();
boolean hasNext= it.hasNext();
if (!hasNext) {
EmptyDisposable.complete(observer);
return;
}
//注释2处,构建一个FromIterableDisposable对象
FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
//注释3处,调用observer的onSubscribe()方法
observer.onSubscribe(d);
//注释4处,这个地方fusionMode会是true
if (!d.fusionMode) {
d.run();
}
}
在注释1处,获取了source的迭代器在这个例子中就是List<Integer>的迭代器。
在注释2处构建一个FromIterableDisposable对象,并传入了InnerObserver和List<Integer>的迭代器。FromIterableDisposable是ObservableFromIterable的静态内部类。
FromIterableDisposable(Observer<? super T> actual, Iterator<? extends T> it) {
this.downstream = actual;
this.it = it;
}
然后在注释3处,调用observer的onSubscribe()方法,并传入构建的FromIterableDisposable对象。
InnerObserver的onSubscribe()方法简化版
volatile SimpleQueue<U> queue;
public void onSubscribe(Disposable d) {
//调用onSubscribe多次不起作用,只有第一次起作用
if (DisposableHelper.setOnce(this, d)) {
if (d instanceof QueueDisposable) {
QueueDisposable<U> qd = (QueueDisposable<U>) d;
//1. m的取值是SYNC
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
fusionMode = m;
//2. 为queue赋值
queue = qd;
done = true;
//3 . 调用parent.drain()
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
注释1处,m的取值是SYNCFromIterableDisposable的requestFusion()方法
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
返回结果是SYNC
注释2处,为queue赋值为传入的FromIterableDisposable对象。
注释3处,调用parent.drain()方法,然后return。这里的parent就是MergeObserver,我们回到MergeObserver类。
MergeObserver的drain方法
void drain() {
if (getAndIncrement() == 0) {
drainLoop();
}
}
方法内部判断getAndIncrement为0的时候调用drainLoop方法,内部逻辑是怎么执行的,我是一步一步debug的。
MergeObserver的drainLoop方法
void drainLoop() {
//我们手写的observer
final Observer<? super U> child = this.downstream;
int missed = 1;
for (; ; ) {
if (checkTerminate()) {
return;
}
SimplePlainQueue<U> svq = queue;
//此条件不满足
if (svq != null) {
for (; ; ) {
if (checkTerminate()) {
return;
}
U o = svq.poll();
if (o == null) {
break;
}
child.onNext(o);
}
}
boolean d = done;
svq = queue;
ObservableFlatMap.InnerObserver<?, ?>[] inner = observers.get();
int n = inner.length;
int nSources = 0;
//此条件不满足
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
nSources = sources.size();
}
}
//此条件不满足
if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
Throwable ex = errors.terminate();
if (ex != ExceptionHelper.TERMINATED) {
if (ex == null) {
child.onComplete();
} else {
child.onError(ex);
}
}
return;
}
int innerCompleted = 0;
if (n != 0) {
long startId = lastId;
int index = lastIndex;
//此条件不满足
if (n <= index || inner[index].id != startId) {
if (n <= index) {
index = 0;
}
int j = index;
for (int i = 0; i < n; i++) {
if (inner[j].id == startId) {
break;
}
j++;
if (j == n) {
j = 0;
}
}
index = j;
lastIndex = j;
lastId = inner[j].id;
}
int j = index;
//continue标签
sourceLoop:
for (int i = 0; i < n; i++) {
if (checkTerminate()) {
return;
}
ObservableFlatMap.InnerObserver<T, U> is = (ObservableFlatMap.InnerObserver<T, U>) inner[j];
//注释1处,取出InnerObserver的quque对象,就是我们传入的FromIterableDisposable对象
SimpleQueue<U> q = is.queue;
if (q != null) {
for (; ; ) {
U o;
try {
//从迭代器中取出数据,没有数据返回null,并将InnerObserver的done置为true
o = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
is.dispose();
errors.addThrowable(ex);
if (checkTerminate()) {
return;
}
removeInner(is);
innerCompleted++;
j++;
if (j == n) {
j = 0;
}
continue sourceLoop;
}
if (o == null) {
break;
}
//调用我们手写的observer的onNext方法
child.onNext(o);
if (checkTerminate()) {
return;
}
}
}
boolean innerDone = is.done;
SimpleQueue<U> innerQueue = is.queue;
//发射完数据就移除掉InnerObserver
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
removeInner(is);
if (checkTerminate()) {
return;
}
innerCompleted++;
}
j++;
if (j == n) {
j = 0;
}
}
lastIndex = j;
lastId = inner[j].id;
}
if (innerCompleted != 0) {
if (maxConcurrency != Integer.MAX_VALUE) {
while (innerCompleted-- != 0) {
ObservableSource<? extends U> p;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
continue;
}
}
subscribeInner(p);
}
}
continue;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
我们第一次调用的时候会发射出第一个List中的3个数据1,2,3。
MergeObserver第2次调用onNext方法。注意注意,这是MergeObserver第2次调用onNext方法。最终会发射出第2个List中的3个数据 4,5,6。
然后MergeObserver最终会调用onComplete方法。
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
drain();
}
MergeObserver的onComplete方法内部调用链 drain->drainLoop->手写observer的onComplete方法。结束战斗。
MergeObserver的drainLoop方法内部逻辑不太好叙述,建议自己debug。
存在的疑问:都说flatMap内部无法保证发射出的数据的顺序,但是我怎么感觉顺序是保证的呢?
参考