RxJava操作符Zip、Merge、Concat

链式调用

链式调用.png

1、Zip

Zip

1.1、zip将多个observable并行执行,通过function,转成一个value给下游。
1.2、当最短的ObservableSource执行完成后,最长的ObservableSource剩余部分不再执行,也就是说,较长的Source的onComplete调用不到。当长度相等时,也会发生这种情况,比如zip(new ObservableSource[]{range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)} action2可能调用不到,比如action1已经完成,而action2将要完成,还没有完成。上述图片[3,null]不会执行。
1.3、zip源码分析

public final class ObservableZip extends <T, R> extends Observable<R>{

  public void subscribeActual(Observer<? super R> observer) {
        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        count = sources.length;
        ZipCoordinator<T, R> zc = new ZipCoordinator<>(observer, zipper, count, delayError);
        zc.subscribe(sources, bufferSize);
    }

//ZipCoordinator
class ZipCoordinator{

    public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
            ZipObserver<T, R>[] s = observers;
            int len = s.length;
            for (int i = 0; i < len; i++) {
                s[i] = new ZipObserver<>(this, bufferSize);
            }
       
            this.lazySet(0);
            downstream.onSubscribe(this);
            for (int i = 0; i < len; i++) {
                if (cancelled) {
                    return;
                }
                sources[i].subscribe(s[I]);
            }
     }

     public void drain() {

            if (getAndIncrement() != 0) {
                return;
            }

            int missing = 1;

            final ZipObserver<T, R>[] zs = observers;
            final Observer<? super R> a = downstream;
            final T[] os = row;
            final boolean delayError = this.delayError;

            for (;;) {  //为了能够执行多次,避免使用同步代码块

                for (;;) {  //遍历observer
                    int i = 0;
                    int emptyCount = 0;
                    for (ZipObserver<T, R> z : zs) {
                        if (os[i] == null) {  //先用历史的判断
                            boolean d = z.done;
                            T v = z.queue.poll();
                            boolean empty = v == null;

                            if (checkTerminated(d, empty, a, delayError, z)) {
                                return;
                            }
                            if (!empty) {
                                os[i] = v;
                            } else {
                                emptyCount++;
                            }
                        } else {
                            if (z.done && !delayError) {
                   
                            }
                        }
                        I++;
                    }
                    if (emptyCount != 0) {
                        break;
                    }

                    R v;
                    try {
                        v = Objects.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
                    } catch (Throwable ex) {
                     
                    }
                    a.onNext(v);
                    Arrays.fill(os, null);
                }
               //避免同步代码块
                missing = addAndGet(-missing);
                if (missing == 0) {
                    return;
                }
            }
        }
   }

 static final class ZipObserver<T, R> implements Observer<T> {

        final ZipCoordinator<T, R> parent;
        final SpscLinkedArrayQueue<T> queue;

        @Override
        public void onNext(T t) {
            queue.offer(t);
            parent.drain();
        }
   }
}

上述代码,首先 sources[i].subscribe(s[I]),订阅后,上游执行onNext,就会调用到ZipObserver中的onNext,此时把执行的结果在onNext中,保存在queue中。因为Observer是有序的,遍历Observer,拿到每一个observer对应的queue中的值。如果为null,则跳出循环,每一个observer都取出相同index的值,则向下执行apply方法。
1.4、实现一个并发执行,顺序返回结果的功能

 Observable.zip(getSource1(true), getSource2(true), new BiFunction<Integer, Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> apply(Integer integer, Integer integer2) throws Throwable {
                Log.d(TAG, "apply:  完成====+++++");
                return Observable.fromArray(integer, integer2);
            }
        }).concatMap(new Function<Observable<Integer>, ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> apply(Observable<Integer> integerObservable) throws Throwable {
                return integerObservable;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d(TAG, "accept:--------- " + integer);
            }
        });

上述concatMap还可以简写成concatMap(Functions.identity())。
1.5、简写Zip功能,去掉转换函数mapper。

public class ZipRxJava {

    private static final String TAG = "RxJavaZip";
    private Observable[] mObservables;
    private MyObserver[] mObservers;
    private Object[] row;

    public void rxjavaZip(Observable... sources) {
        if (sources == null || sources.length == 0) {
            return;
        }
        mObservables = sources;
        mObservers = new MyObserver[sources.length];
        row = new Object[sources.length];
        for (int i = 0; i < sources.length; i++) {
            MyObserver observer = new MyObserver(128);
            mObservers[i] = observer;
            mObservables[i].subscribe(mObservers[i]);
        }
    }

    private synchronized void drain() {
        Log.d(TAG, "drain:  开始begin====== " + Thread.currentThread().getName());
        boolean hasAllRunComplete = true;
        Object[] objects = row;
        for (int i = 0; i < mObservers.length; i++) {
            MyObserver observer = mObservers[i];
            if (objects[i] == null) {
                Object poll = observer.queue.poll();
                if (poll == null) {
                    hasAllRunComplete = false;
                    break;
                } else {
                    objects[i] = poll;
                }
            }
            Log.d(TAG, "drain: 数组   " + Arrays.toString(objects));
        }
        Log.d(TAG, "drain: 数组f " + Arrays.toString(objects));
        if (hasAllRunComplete) {
            for (int i = 0; i < row.length; i++) {
                Log.d(TAG, "drain: " + row[i]);
            }
            Arrays.fill(objects, null);
        }
    }

    class MyObserver<T> implements Observer<T> {

        final SpscLinkedArrayQueue<T> queue;

        public MyObserver(int count) {
            queue = new SpscLinkedArrayQueue<>(count);
        }

        @Override
        public void onNext(@NonNull T t) {
            queue.offer(t);
            drain();
        }
    }

}

使用

public void testRxjavaZip() {
        ZipRxJava rxJavaZip = new ZipRxJava();
        rxJavaZip.rxjavaZip(Observable.just(1, 2, 3).subscribeOn(Schedulers.newThread()),
                Observable.just(4, 5).delay(1, TimeUnit.SECONDS).subscribeOn(Schedulers.newThread()));
    }
2、merge
merge.png
  public static <@NonNull T> Observable<T> merge(@NonNull ObservableSource<? extends T> source1, @NonNull ObservableSource<? extends T> source2) {
        return fromArray(source1, source2).flatMap((Function)Functions.identity(), false, 2);
    }

2.1、fromArray、fromIterable
从上面看出fromArray是最上游的Observable,调用OnNext 会直接调用MergeObserver的onNext。source中的onNext会调用到InnerObserver中的onNext()

  void run() {
            boolean hasNext;
            do {
                T v;
                try {
                    v = Objects.requireNonNull(it.next(), "The iterator returned a null value");
                } catch (Throwable e) {
                }
                downstream.onNext(v);
                try {
                    hasNext = it.hasNext();
                } catch (Throwable e) {
                }
            } while (hasNext);
        }

2.2、flatMap
mapper : 转换函数,返回一个ObservableSource对象。
delayErrors : 延迟错误。
maxConcurrency :最大并发数,同时可执行多少个ObservableSource
bufferSize :缓存多少个ObservableSource对象

flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize)

看下FlatMap源码
2.2.1、上游订阅当前的MergeObserver,传递OnNext会传递到MegeObser的onNext中。

public final class ObservableFlatMap  extends Observable{

     @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
}

2.2.2、在onNext中使用mapper函数转化,转化后的是Observable对象,因此必须给emit出去。所以将改Observable订阅内部的InnerObserver,执行内部的onNext,然后调用下游真正的Observer的OnNext()

  @Override
        public void onNext(T t) {
            ObservableSource<? extends U> p;
            try {
                p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
            }
            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    if (wip == maxConcurrency) {
                        sources.offer(p);
                        return;
                    }
                    wip++;
                }
            }
            subscribeInner(p);
        }

maxConcurrency 最大并发数,如果超过了最大并发数,就存在sources队列中,当执行完一个Observable之后,再从sources队列中取。


merge.png

从上图也可以看出,merge结果是无序的,但是每一个ObservableSource的结果是有序的。 当超过超过最大并发数,就会等待前面的source执行完,再执行。

class InnerObserver{
        public void onNext(U t) {
            parent.tryEmit(t, this);
       }
}

class MergeObserver {

     void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                downstream.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }


    void drainLoop() {
            final Observer<? super U> child = this.downstream;
            int missed = 1;
            for (;;) {

                boolean d = done;
                svq = queue;
                InnerObserver<?, ?>[] inner = observers.get();
                int n = inner.length;

                if (n != 0) {
                    int j = Math.min(n - 1, lastIndex);

                    sourceLoop:
                    for (int i = 0; i < n; i++) {
                        if (checkTerminate()) {
                            return;
                        }

                        @SuppressWarnings("unchecked")
                        InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                        SimpleQueue<U> q = is.queue;
                        if (q != null) {
                            for (;;) {
                                U o;
                                try {
                                    o = q.poll();
                                } catch (Throwable ex) {
                          
                                }
                                if (o == null) {
                                    break;
                                }

                                child.onNext(o);

                                if (checkTerminate()) {
                                    return;
                                }
                            }
                        }

                        boolean innerDone = is.done;
                        SimpleQueue<U> innerQueue = is.queue;
                        if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            removeInner(is);
                            innerCompleted++;
                        }

                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    lastIndex = j;
                }

                if (innerCompleted != 0) {
                    if (maxConcurrency != Integer.MAX_VALUE) {
                        subscribeMore(innerCompleted);
                        innerCompleted = 0;
                    }
                    continue;
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
}

上面代码有这几点
1)、 外部调用onNext 直接调用到了InnerObserver的onNext,将next值直接调用downstream.onNext(),或者存入到queue中。
2)、遍历InnerObserver 从其中的queue中取出里面的值,调用downStream.onNext(),observer没有值就遍历下一个observer。
3)、是执行完成一个source后,从sources队列中取出新source,订阅InnerObserver。
可以看出,外部的source谁先调用OnNext,就先调用谁的Observer, 然后执行下游downStream.onNext()

3、concat

concat.png
 public static <@NonNull T> Observable<T> concat(@NonNull Iterable<@NonNull ? extends ObservableSource<? extends T>> sources) {
   fromIterable(sources).concatMapDelayError((Function)Functions.identity(), false, bufferSize());
    }

3.1 fromIterable这里和2.1是一样的,也是上游发送数据的地方。
3.2 订阅

class ObservableConcatMap {

 @Override
    public void subscribeActual(Observer<? super U> observer) {
            SerializedObserver<U> serial = new SerializedObserver<>(observer);
            source.subscribe(new SourceObserver<>(serial, mapper, bufferSize));
    }
}

3.3 将Observerable存到queue中,遍历queue,转化observable对象,订阅到
InnerObserver。

static final class SourceObserver<T, U> {

        @Override
        public void onNext(T t) {
            if (fusionMode == QueueDisposable.NONE) {
                queue.offer(t);
            }
            drain();
        }

        void drain() {

            for (; ; ) {
                if (!active) {
                    boolean d = done;
                    T t;
                    try {
                        t = queue.poll();
                    } catch (Throwable ex) {
                    }
                    boolean empty = t == null;
                    if (!empty) {
                        ObservableSource<? extends U> o;
                        try {
                            o = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                        } catch (Throwable ex) {
                        }

                        active = true;
                        o.subscribe(inner);
                    }
                }
            }
        }
     
      void innerComplete() {
            active = false;
            drain();
        }

}

 static final class InnerObserver<U>  {
        @Override
        public void onNext(U t) {
            downstream.onNext(t);
        }

            @Override
            public void onComplete() {
                parent.innerComplete();
            }

  }

从上面代码看出,当一个InnerObserver执行完成之后,将active 设置为false,然后for循环中再取下一个Observable对象订阅。

4、总结

merge是上游调用OnNext之后,只要任何一个InnerObserver中有数据,就调用downstream.onNext(),结果是无序的。
concat 只有一个source执行完成之后,才会执行下一个source,结果是有序的。
zip通过1.4可以实现有序,但是必须得等待source都执行完成才能执行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容