链式调用
1、Zip
1.1、zip将多个observable并行执行,通过function,转成一个value给下游。
1.2、当最短的ObservableSource执行完成后,最长的ObservableSource剩余部分不再执行,也就是说,较长的Source的onComplete调用不到。当长度相等时,也会发生这种情况,比如zip(new ObservableSource[]{range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)} action2可能调用不到,比如action1已经完成,而action2将要完成,还没有完成。上述图片[3,null]不会执行。
1.3、zip源码分析
public final class ObservableZip extends <T, R> extends Observable<R>{
public void subscribeActual(Observer<? super R> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
count = sources.length;
ZipCoordinator<T, R> zc = new ZipCoordinator<>(observer, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
//ZipCoordinator
class ZipCoordinator{
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
ZipObserver<T, R>[] s = observers;
int len = s.length;
for (int i = 0; i < len; i++) {
s[i] = new ZipObserver<>(this, bufferSize);
}
this.lazySet(0);
downstream.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
sources[i].subscribe(s[I]);
}
}
public void drain() {
if (getAndIncrement() != 0) {
return;
}
int missing = 1;
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = downstream;
final T[] os = row;
final boolean delayError = this.delayError;
for (;;) { //为了能够执行多次,避免使用同步代码块
for (;;) { //遍历observer
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) { //先用历史的判断
boolean d = z.done;
T v = z.queue.poll();
boolean empty = v == null;
if (checkTerminated(d, empty, a, delayError, z)) {
return;
}
if (!empty) {
os[i] = v;
} else {
emptyCount++;
}
} else {
if (z.done && !delayError) {
}
}
I++;
}
if (emptyCount != 0) {
break;
}
R v;
try {
v = Objects.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
} catch (Throwable ex) {
}
a.onNext(v);
Arrays.fill(os, null);
}
//避免同步代码块
missing = addAndGet(-missing);
if (missing == 0) {
return;
}
}
}
}
static final class ZipObserver<T, R> implements Observer<T> {
final ZipCoordinator<T, R> parent;
final SpscLinkedArrayQueue<T> queue;
@Override
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
}
}
上述代码,首先 sources[i].subscribe(s[I]),订阅后,上游执行onNext,就会调用到ZipObserver中的onNext,此时把执行的结果在onNext中,保存在queue中。因为Observer是有序的,遍历Observer,拿到每一个observer对应的queue中的值。如果为null,则跳出循环,每一个observer都取出相同index的值,则向下执行apply方法。
1.4、实现一个并发执行,顺序返回结果的功能
Observable.zip(getSource1(true), getSource2(true), new BiFunction<Integer, Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer integer, Integer integer2) throws Throwable {
Log.d(TAG, "apply: 完成====+++++");
return Observable.fromArray(integer, integer2);
}
}).concatMap(new Function<Observable<Integer>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Observable<Integer> integerObservable) throws Throwable {
return integerObservable;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d(TAG, "accept:--------- " + integer);
}
});
上述concatMap还可以简写成concatMap(Functions.identity())。
1.5、简写Zip功能,去掉转换函数mapper。
public class ZipRxJava {
private static final String TAG = "RxJavaZip";
private Observable[] mObservables;
private MyObserver[] mObservers;
private Object[] row;
public void rxjavaZip(Observable... sources) {
if (sources == null || sources.length == 0) {
return;
}
mObservables = sources;
mObservers = new MyObserver[sources.length];
row = new Object[sources.length];
for (int i = 0; i < sources.length; i++) {
MyObserver observer = new MyObserver(128);
mObservers[i] = observer;
mObservables[i].subscribe(mObservers[i]);
}
}
private synchronized void drain() {
Log.d(TAG, "drain: 开始begin====== " + Thread.currentThread().getName());
boolean hasAllRunComplete = true;
Object[] objects = row;
for (int i = 0; i < mObservers.length; i++) {
MyObserver observer = mObservers[i];
if (objects[i] == null) {
Object poll = observer.queue.poll();
if (poll == null) {
hasAllRunComplete = false;
break;
} else {
objects[i] = poll;
}
}
Log.d(TAG, "drain: 数组 " + Arrays.toString(objects));
}
Log.d(TAG, "drain: 数组f " + Arrays.toString(objects));
if (hasAllRunComplete) {
for (int i = 0; i < row.length; i++) {
Log.d(TAG, "drain: " + row[i]);
}
Arrays.fill(objects, null);
}
}
class MyObserver<T> implements Observer<T> {
final SpscLinkedArrayQueue<T> queue;
public MyObserver(int count) {
queue = new SpscLinkedArrayQueue<>(count);
}
@Override
public void onNext(@NonNull T t) {
queue.offer(t);
drain();
}
}
}
使用
public void testRxjavaZip() {
ZipRxJava rxJavaZip = new ZipRxJava();
rxJavaZip.rxjavaZip(Observable.just(1, 2, 3).subscribeOn(Schedulers.newThread()),
Observable.just(4, 5).delay(1, TimeUnit.SECONDS).subscribeOn(Schedulers.newThread()));
}
2、merge
public static <@NonNull T> Observable<T> merge(@NonNull ObservableSource<? extends T> source1, @NonNull ObservableSource<? extends T> source2) {
return fromArray(source1, source2).flatMap((Function)Functions.identity(), false, 2);
}
2.1、fromArray、fromIterable
从上面看出fromArray是最上游的Observable,调用OnNext 会直接调用MergeObserver的onNext。source中的onNext会调用到InnerObserver中的onNext()
void run() {
boolean hasNext;
do {
T v;
try {
v = Objects.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable e) {
}
downstream.onNext(v);
try {
hasNext = it.hasNext();
} catch (Throwable e) {
}
} while (hasNext);
}
2.2、flatMap
mapper : 转换函数,返回一个ObservableSource对象。
delayErrors : 延迟错误。
maxConcurrency :最大并发数,同时可执行多少个ObservableSource
bufferSize :缓存多少个ObservableSource对象
flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize)
看下FlatMap源码
2.2.1、上游订阅当前的MergeObserver,传递OnNext会传递到MegeObser的onNext中。
public final class ObservableFlatMap extends Observable{
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
}
2.2.2、在onNext中使用mapper函数转化,转化后的是Observable对象,因此必须给emit出去。所以将改Observable订阅内部的InnerObserver,执行内部的onNext,然后调用下游真正的Observer的OnNext()
@Override
public void onNext(T t) {
ObservableSource<? extends U> p;
try {
p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
maxConcurrency 最大并发数,如果超过了最大并发数,就存在sources队列中,当执行完一个Observable之后,再从sources队列中取。
从上图也可以看出,merge结果是无序的,但是每一个ObservableSource的结果是有序的。 当超过超过最大并发数,就会等待前面的source执行完,再执行。
class InnerObserver{
public void onNext(U t) {
parent.tryEmit(t, this);
}
}
class MergeObserver {
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
downstream.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
void drainLoop() {
final Observer<? super U> child = this.downstream;
int missed = 1;
for (;;) {
boolean d = done;
svq = queue;
InnerObserver<?, ?>[] inner = observers.get();
int n = inner.length;
if (n != 0) {
int j = Math.min(n - 1, lastIndex);
sourceLoop:
for (int i = 0; i < n; i++) {
if (checkTerminate()) {
return;
}
@SuppressWarnings("unchecked")
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
SimpleQueue<U> q = is.queue;
if (q != null) {
for (;;) {
U o;
try {
o = q.poll();
} catch (Throwable ex) {
}
if (o == null) {
break;
}
child.onNext(o);
if (checkTerminate()) {
return;
}
}
}
boolean innerDone = is.done;
SimpleQueue<U> innerQueue = is.queue;
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
removeInner(is);
innerCompleted++;
}
j++;
if (j == n) {
j = 0;
}
}
lastIndex = j;
}
if (innerCompleted != 0) {
if (maxConcurrency != Integer.MAX_VALUE) {
subscribeMore(innerCompleted);
innerCompleted = 0;
}
continue;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
上面代码有这几点
1)、 外部调用onNext 直接调用到了InnerObserver的onNext,将next值直接调用downstream.onNext(),或者存入到queue中。
2)、遍历InnerObserver 从其中的queue中取出里面的值,调用downStream.onNext(),observer没有值就遍历下一个observer。
3)、是执行完成一个source后,从sources队列中取出新source,订阅InnerObserver。
可以看出,外部的source谁先调用OnNext,就先调用谁的Observer, 然后执行下游downStream.onNext()
3、concat
public static <@NonNull T> Observable<T> concat(@NonNull Iterable<@NonNull ? extends ObservableSource<? extends T>> sources) {
fromIterable(sources).concatMapDelayError((Function)Functions.identity(), false, bufferSize());
}
3.1 fromIterable这里和2.1是一样的,也是上游发送数据的地方。
3.2 订阅
class ObservableConcatMap {
@Override
public void subscribeActual(Observer<? super U> observer) {
SerializedObserver<U> serial = new SerializedObserver<>(observer);
source.subscribe(new SourceObserver<>(serial, mapper, bufferSize));
}
}
3.3 将Observerable存到queue中,遍历queue,转化observable对象,订阅到
InnerObserver。
static final class SourceObserver<T, U> {
@Override
public void onNext(T t) {
if (fusionMode == QueueDisposable.NONE) {
queue.offer(t);
}
drain();
}
void drain() {
for (; ; ) {
if (!active) {
boolean d = done;
T t;
try {
t = queue.poll();
} catch (Throwable ex) {
}
boolean empty = t == null;
if (!empty) {
ObservableSource<? extends U> o;
try {
o = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable ex) {
}
active = true;
o.subscribe(inner);
}
}
}
}
void innerComplete() {
active = false;
drain();
}
}
static final class InnerObserver<U> {
@Override
public void onNext(U t) {
downstream.onNext(t);
}
@Override
public void onComplete() {
parent.innerComplete();
}
}
从上面代码看出,当一个InnerObserver执行完成之后,将active 设置为false,然后for循环中再取下一个Observable对象订阅。
4、总结
merge是上游调用OnNext之后,只要任何一个InnerObserver中有数据,就调用downstream.onNext(),结果是无序的。
concat 只有一个source执行完成之后,才会执行下一个source,结果是有序的。
zip通过1.4可以实现有序,但是必须得等待source都执行完成才能执行。