4-RxJava源码分析之 --- 操作符

RxJava是一个生产者和消费者模型,有生产者Observable和消费者Observer,对于简单的一个生产者和一个消费者的情况比较简单,但真实业务中生产者可能有多个,且多个生产者合作生产的情况可能有多种,比如多个生产者谁先生产第一个其他的就取消。而RxJava的操作符就是用于组合多个生产者的逻辑实现。

1 - "amb"操作符源码分析

"amb"操作符的作用是,组合多个ObservableSource保证谁先生产出数据,就只接收到ObservableSource的数据,操作符示意图如下:


amb操作符
public abstract class Observable<T> implements ObservableSource<T> {
     public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources) {
        ObjectHelper.requireNonNull(sources, "sources is null");
        // 该操作把多个ObservableSource转化为一个ObservableAmb对象
        return RxJavaPlugins.onAssembly(new ObservableAmb<T>(null, sources));
    }
}


public final class ObservableAmb<T> extends Observable<T> {
    final ObservableSource<? extends T>[] sources;
    final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
    
    public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
        this.sources = sources;
        this.sourcesIterable = sourcesIterable;
    }
    
     public void subscribeActual(Observer<? super T> s) {
        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            // 1 - 这里的逻辑是把sourcesIterable中的对象添加到source数组中
            ... 
        } else {
            count = sources.length;
        }

        if (count == 0) {
            EmptyDisposable.complete(s);
            return;
        } else
        if (count == 1) {
            sources[0].subscribe(s);
            return;
        }
        
        // 2 - 创建一个AmbCoordinator对象
        AmbCoordinator<T> ac = new AmbCoordinator<T>(s, count);
        // 调用各个ObservableSource的subscribe()
        ac.subscribe(sources);
    }
    
    // 3 - 处理多个ObservableSource竞争生产权的逻辑处理类,通过为没一个ObservableSource创建一个Observer,
    // 当对应的Observer收到数据时,来竞争是否赢得生产权,如果赢得生产权就把其他的Observer dispose,
    // 保证只有赢得生产权的ObservableSource来发送数据给原始Observer
     static final class AmbCoordinator<T> implements Disposable {
        final Observer<? super T> actual;
        final AmbInnerObserver<T>[] observers;
        final AtomicInteger winner = new AtomicInteger();

        @SuppressWarnings("unchecked")
        AmbCoordinator(Observer<? super T> actual, int count) {
            this.actual = actual;
            this.observers = new AmbInnerObserver[count];
        }

        // 处理sources中各个ObservableSource的subscribe逻辑
        public void subscribe(ObservableSource<? extends T>[] sources) {
            AmbInnerObserver<T>[] as = observers;
            int len = as.length;
            for (int i = 0; i < len; i++) {
                // 4 - 把Observer转为AmbInnerObserver,AmbInnerObserver发送数据时通知到AmbCoordinator中来处理谁赢得了生产权
                as[i] = new AmbInnerObserver<T>(this, i + 1, actual);
            }
            winner.lazySet(0); // release the contents of 'as'
            // 5 - 调用Observer.onSubscribe()
            actual.onSubscribe(this);

            for (int i = 0; i < len; i++) {
                // 6 - 如果已经有ObservableSource赢取了生产权,不再处理
                if (winner.get() != 0) {
                    return;
                }
                // 7 -如果还没哪个ObservableSource赢得生产权,就调用所有ObservableSource.subscribe()
                sources[i].subscribe(as[i]);
            }
        }

        // 8 - 当AmbInnerObserver收到数据时调用,代表AmbInnerObserver对应的ObservableSource赢得了生产权,
        // 把其他的AmbInnerObserver dispose。保证只有赢得生产权的ObservableSource生产数据。
        public boolean win(int index) {
            int w = winner.get();
            if (w == 0) {
                // 如果还没人赢得生产权,就设置该index,表示该index对应得Observable赢得生产权
                if (winner.compareAndSet(0, index)) {
                    AmbInnerObserver<T>[] a = observers;
                    int n = a.length;
                    // 赢得生产权后把其他得Observer dispose
                    for (int i = 0; i < n; i++) {
                        if (i + 1 != index) {
                            a[i].dispose();
                        }
                    }
                    return true;
                }
                return false;
            }
            return w == index;
        }
        ...   
    }
    
    // 9 - 用于转发生产者发送过来的数据并转发给原始的Observer,同时在收到数据时,调用AmbCoordinator.win(index)来赢得生产权
    static final class AmbInnerObserver<T> extends AtomicReference<Disposable> implements Observer<T> {
        private static final long serialVersionUID = -1185974347409665484L;
        final AmbCoordinator<T> parent;
        final int index;
        final Observer<? super T> actual;

        boolean won;

        AmbInnerObserver(AmbCoordinator<T> parent, int index, Observer<? super T> actual) {
            this.parent = parent;
            this.index = index;
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this, s);
        }

        @Override
        public void onNext(T t) {
            if (won) {
                // 10 - 已经赢得来生产权,直接转发数据
                actual.onNext(t);
            } else {
                // 11 - 竞争,赢得生产权
                if (parent.win(index)) {
                    // 12 - 赢得生产权
                    won = true;
                    actual.onNext(t);
                } else {
                    // 13 - 没有赢得生产权
                    get().dispose();
                }
            }
        }

        @Override
        public void onError(Throwable t) {
            // 14 - 与onNext()处理一样
            if (won) {
                actual.onError(t);
            } else {
                if (parent.win(index)) {
                    won = true;
                    actual.onError(t);
                } else {
                    RxJavaPlugins.onError(t);
                }
            }
        }

        @Override
        public void onComplete() {
            // 15 - 与onNext()处理一样
            if (won) {
                actual.onComplete();
            } else {
                if (parent.win(index)) {
                    won = true;
                    actual.onComplete();
                }
            }
        }

        public void dispose() {
            DisposableHelper.dispose(this);
        }
    }
}

从上面代码分析可知,"amb"操作符是把多个ObservableSource先转化为ObservableAmb对象,并在ObservableAmb中为每个ObservableSource创建对应的包装的AmbInnerObserver(可把收到的数据转发给原始Observer),并进行订阅操作,当某个AmbInnerObserver先收到数据时,代表ObservableSource赢得生产权,把其他的AmbInnerObserver dispose,保证只接收赢得生产权的ObservableSource生产的数据。

2 - "concatArray"操作符源码分析

"concatArray"操作符的作用是,组合多个ObservableSource按照顺序依次生产发送数据,即多个ObservableSource一个生产完数据之后再下一个,依次下去知道所有的结束,Observer接收所有ObservableSource发送的数据,操作符示意图如下:


concatArray操作符

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) {
        if (sources.length == 0) {
            return empty();
        } else
        if (sources.length == 1) {
            return wrap((ObservableSource<T>)sources[0]);
        }
        // 1 - fromArray(sources)把多个ObservableSource转为一个发送这些ObservableSource的Observable
        // 并把转化后的Observable转为ObservableConcatMap对象,记住此处arrayObservable=fromArray(sources)发送的是参数的sources
        // 2 - 下一步看看ObservableConcatMap
        return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
    }
}

public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            int bufferSize, ErrorMode delayErrors) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.bufferSize = Math.max(8, bufferSize);
    }
    @Override
    public void subscribeActual(Observer<? super U> s) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
            return;
        }

        if (delayErrors == ErrorMode.IMMEDIATE) {
            SerializedObserver<U> serial = new SerializedObserver<U>(s);
            // 3 - 这里的source是arrayObservable,即那个发送原始sources的Observable
            // 把原始Observer转为SourceObserver,这种是接收到error就结束,下面的情况和这差不多就不看了,
            // 下面看看SourceObserver
            source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
        } else {
            source.subscribe(new ConcatMapDelayErrorObserver<T, U>(s, mapper, bufferSize, delayErrors == ErrorMode.END));
        }
    }
    
    static final class SourceObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable {
        SourceObserver(Observer<? super U> actual,
                                Function<? super T, ? extends ObservableSource<? extends U>> mapper, int bufferSize) {
            this.actual = actual;
            this.mapper = mapper;
            this.bufferSize = bufferSize;
            // 把原始Observer转为InnerObserver
            this.inner = new InnerObserver<U>(actual, this);
        }
        
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY);
                    // 4 - 对于concatArray这里是true,可以看ObservableFromArray中的requestFusion逻辑
                    if (m == QueueDisposable.SYNC) {
                        fusionMode = m;
                        // 5 - 这里把QueueDisposable存为queue,这里的QueueDisposable对应的在ObservableFromArray中,即保存了所有原始sources
                        queue = qd;
                        done = true;

                        actual.onSubscribe(this);
                        
                        // 6 - 处理发送数据逻辑
                        drain();
                        return;
                    }
                    
                    if (m == QueueDisposable.ASYNC) {
                        fusionMode = m;
                        queue = qd;

                        actual.onSubscribe(this);

                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                actual.onSubscribe(this);
            }
        }
        
        void drain() {
            if (getAndIncrement() != 0) {
                return;
            }

            for (;;) {
                if (disposed) {
                    queue.clear();
                    return;
                }
                // 7 - 没有正在处理的ObservableSource
                if (!active) {
                    boolean d = done;

                    T t;

                    try {
                        // 8 - 取出一个ObservableSource
                        t = queue.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        dispose();
                        queue.clear();
                        actual.onError(ex);
                        return;
                    }

                    boolean empty = t == null;

                    if (d && empty) {
                        disposed = true;
                        actual.onComplete();
                        return;
                    }

                    if (!empty) {
                        ObservableSource<? extends U> o;

                        try {
                            o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            dispose();
                            queue.clear();
                            actual.onError(ex);
                            return;
                        }

                        // 9 - 标记为true,for循环不在处理ObservableSource
                        active = true;
                        // 10 - 处理订阅原始的ObservableSource
                        o.subscribe(inner);
                    }
                }

                if (decrementAndGet() == 0) {
                    break;
                }
            }
        }
        
        void innerComplete() {
            // 13 - 比较为false,drain()中可以处理下一个ObservableSource
            active = false;
            // 14 - 调drain(),处理下一个ObservableSource
            drain();
        }
    }
    
    static final class InnerObserver<U> extends AtomicReference<Disposable> implements Observer<U> {
            ... 
            
            @Override
            public void onError(Throwable t) {
                // 11 - Error,整个流程结束
                parent.dispose();
                actual.onError(t);
            }
            @Override
            public void onComplete() {
                // 12 - 一个成功结束,调用SourceObserver.innerComplete()
                parent.innerComplete();
            }

            void dispose() {
                DisposableHelper.dispose(this);
            }
        }
}

从上面的代码分析可清除了解到,"concatArray"操作符是把多个ObservableSource转为一个ObservableFromArray类型的Observable(注意:在ObservableFromArray中把sources放进FromArrayDisposable,之后传递给Observer.onSubscribe(FromArrayDisposable)),然后又把ObservableFromArray转为ObservableConcatMap类型Observable,在ObservableConcatMap中把原始Observer转为SourceObserver,在SourceObserver中取出FromArrayDisposable中的sources,逐个ObservableSource进行生产数据,并把原始的Observer转为InnerObserver配合实现逐个ObservableSource生产数据。

总结语

操作符就只分析这两个,因为实在太多了,大家感兴趣可以自己去看看源码。总的来说操作符得逻辑就是把原始得一个或多个Observable转为另一种Observable来处理相关逻辑,把原始Observer转为另一种Observer出来相关逻辑。

RxJava源码分析系列文章主题目录:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350