RxJava源码分析(一)基本的数据流分析(无背压)

引言

关于RxJava2的用法网上的资料很多,这里我们只学习它的实现原理。本文专题目的:
1.知道源头(Observable)是如何将数据发送出去的。
2.知道终点(Observer)是如何接收到数据的。
3.何时将源头和终点关联起来的
今天我们先从最简单的无背压(Observable)的create操作符说起,来解决前三个问题。

样例

       //1.创建被观察者,生产事件
        final Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            //2.订阅的时候发送事件
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);

//                emitter.onError(new Throwable("haha"));
                emitter.onComplete();//onComplete事件发送后,后面的所有事件无效,且后面不能发送错误事件
                emitter.onNext(3);

            }
        });
        //3.定义观察者
        Observer<Integer> observer = new Observer<Integer>() {
            
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "开始采用subscribe连接");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.e(TAG, "对Next事件" + value + "作出响应");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "对Complete事件作出响应");
            }
        };
        //4建立联系
        observable.subscribe(observer);

我们看到出现了一下几个角色:

  1. Observable:被观察者,是数据的源头,通过subscribe订阅被观察者;
  2. ObservableOnSubscribe:从代码结构上看,Observable的构造方法需要它,且持有subscribe方法,这里暂时理解为观察者和被观察者的中间件,具体作用后面再看;
  3. ObservableEmitter:顾名思义,是数据发射器,被观察者通过它发送事件;
  4. Observer:被观察者,数据接受者,持有onNext、onError、onComplete、onSubscribe方法。

Observable

public abstract class Observable<T> implements ObservableSource<T> {
...
}

实现了ObservableSource接口:

public interface ObservableSource<T> {
    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(Observer<? super T> observer);
}

接口很简单,提供了订阅观察者的功能,Observable中该方法的实现后面再看。我们先看看create操作符干了些啥:

public abstract class Observable<T> implements ObservableSource<T> {
    ...
   @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //判空
        ObjectHelper.requireNonNull(source, "source is null");
       //构造ObservableCreate对象
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    ...
}

最后通过用户构造的ObservableOnSubscribe对象,返回了ObservableCreate对象。我们先看看ObservableOnSubscribe。

数据发射封装-ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {
    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

是个接口,用户发射数据就是在这个接口实现中完成,具体见样例代码,这里流一个疑问:入参ObservableEmitter是怎么来的,别急后面马上会讲到!

create操作符的产物-ObservableCreate

然后再看Observable的子类ObservableCreate,根据名字我们可以猜到它是由create操作符创建的被观察者:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;//用户实现具体的数据发射操作

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            //调用中间件ObservableOnSubscribe的订阅方法,开始调用发射数据的代码了!
             //说明1
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
...
}

这里的source由用户构造,实现发射数据操作,subscribeActual方法是核心,当订阅观察者是,最终会执行subscribeActual方法,后面会具体说明,不过看方法名也应该能猜到。
前面我们讲ObservableOnSubscribe的subscribe方法时,关于入参的来源留下了一个疑问,这里看说明1的代码:入参parent类型为CreateEmitter,很明显它必然是ObservableEmitter的子类或者子接口。

事件发射器--ObservableEmitter

public interface ObservableEmitter<T> extends Emitter<T> {
  ...
}

public interface Emitter<T> {
    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

可见发射器接口提供了发射数据的功能,在回过头来看看CreateEmitter,它是ObservableCreate的内部类.

public final class ObservableCreate<T> extends Observable<T> {
...
...
static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;
        //持有观察者,发射器的发送数据的方法其实是调用观察者对应的方法
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //调用观察者的接收数据的方法
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //发送完成事件后,断开连接,不接受后序事件
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        ...
    }
}

饶了半天,我们终于找到被观察者被调用的地方了,用户调用发射器的发送数据的方法最终会通过ObservableCreate中的CreateEmitter实现调用,而CreateEmitter最终又会调用观察者的接收数据方法,到此为止,下游接收数据的流程如下:
1.create操作符通过ObservableOnSubscribe对象构造ObservableCreate对象;

  1. ObservableCreate在执行订阅方法subscribeActual时,通过Observer对象构造发射器CreateEmitter;
  2. CreateEmitter发射数据最终会调用Observer对应的接收数据方法。

Observable的订阅方法

上面我们理解了接收数据的流程,下面我们瞅瞅Observable和Observer建立联系的订阅方法:

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //subscribeActual核心代码!!!
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

lei了lei了!看到subscribeActual方法就像遇到亲人了,之前我们了解到Create操作法创建的是ObservableCreate对象,这里用户执行订阅方法时会调用subscribeActual,我们再回头看看ObservableCreate的subscribeActual实现:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //根据Observer构造发射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            //这里由用户实现发射数据操作
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

读到这里,发现整个数据的产生和接收终于打通,订阅方法通过 source.subscribe(parent)由用户发射数据,在本样例中就是:

final Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            //2.订阅的时候发送事件
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);

//                emitter.onError(new Throwable("haha"));
                emitter.onComplete();//onComplete事件发送后,后面的所有事件无效,且后面不能发送错误事件
                emitter.onNext(3);
            }
        });

emitter参数就是CreateEmitter类型发射器parent。相信看到这里,整个数据的流程应该比较清晰了。数据流向如下:
Observable订阅Observer--> Observable执行subscribeActual方法--> ObservableOnSubscribe执行subscribe方法--> ObservableEmitter执行发射数据方法--> Observer执行接收数据方法。
我们可以看到,RxJava规定了数据从Observable到Observer的统一流程,至于用户发送什么数据、按什么顺序发都通过中间件ObservableOnSubscribe和ObservableEmitter实现。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,039评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,426评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,417评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,868评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,892评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,692评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,416评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,326评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,782评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,957评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,102评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,790评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,442评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,996评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,113评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,332评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,044评论 2 355

推荐阅读更多精彩内容