RxJava2.x 从源码分析原理

RxJava 相信各位已经使用了很久,大部分人在刚学习 RxJava 感叹切换线程的方便,调用逻辑清晰的同时,并不知道其中的原理,主要是靠记住运行的顺序。
随着我们设计出的 RxJava流 越来越复杂,一些复杂的问题并不能靠着记住的运行顺序就能解决。
下面,就通过最常用的操作符的源码来看看所谓的是什么运行的。

首先我们用Single举例,设计一个最基本的 RxJava 流,只有一个 Observable(ColdObservable)Obsever

Disposable disposable = Single.just("wtf")
                        .subscribe(it -> Log.i("subscribe", it));

上游发送一个"wtf" ,下游接受时将其打印出来。上游发送端使用 Single.just 作为创建方法,
看一下 just() 方法里做了什么。

    public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }
    
    public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
    Function<? super Single, ? extends Single> f = onSingleAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

其中 ObjectHelper.requireNonNull 只是空检查。
RxJavaPlugins.onAssembly 方法,这个方法其实就是通过一个全局的变量 onSingleAssembly 来对方法进行 Hook ,这一系列xxxAssembly全局变量默认为空,所以实际上当我们没有设置的时候其实 just 方法是直接返回了一个 新实例化的SingleJust对象。

再看看SingleJust内部:

public final class SingleJust<T> extends Single<T> {

    final T value;
    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

}

实例化的时候只是将值保存了下来,没有其它操作。
下一步调用subscribe()来启动这个流(ColdObservable),然后看看subscribe中做了什么:

    public final void subscribe(SingleObserver<? super T> subscriber) {
        ObjectHelper.requireNonNull(subscriber, "subscriber is null");
        subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
        ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");

        try {
             //核心逻辑
            subscribeActual(subscriber);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }

同样 RxJavaPlugins.onSubscribe 默认没有作用,实际的核心逻辑是调用了subscribeActual(SingleObserver)
对于我们上面设计的流,则是调用了 SingleJust 中的 subscribeActual(SingleObserver)

回顾上面 SingleJustsubscribeActual(SingleObserver) 的实现:

        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);

得到两个信息

  • 首先调用下游观察者 SingleObserverOnSubscribe 方法并传递用于取消操作的 Disposable
  • 调用OnSuccess 方法并传递之前保存下来的 value

Map 操作符

现在我们加入一个常用且重要的Map操作到流中

Disposable disposable = Single.just("wtf")
                 .map(it-> 0)
                 .subscribe(it -> Log.i("subscribe", String.of(it)));

上面这个流包括了三种典型的操作 创建Creation 操作符Transformation和 订阅Subscribe

依然先检查map() 方法,可以看到其中实例化了一个SingleMap

    public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
    }

再看看 SingleMap

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;
    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;
        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

类中信息稍微复杂一些:

  1. 首先我们关注在SingleMap实例化的时候也是只做了保存数据的操作,而没有实际逻辑:将流的上游保存为 source 将数据转换的方法保存为 mapper
  2. 第二步我们知道下游观察者 SingleObserver 会调用核心逻辑 subscribeActual方法来启动流
  3. 在这里的subscribeActual方法中可以看到几个重要的信息
    • MapSingleObserver是一个观察者
    • MapSingleObserver 保存了下游的观察者 SingleObserver 以及 mapper
    • 上游 sourceMapSingleObserver 订阅

由此可以看出在SingleMap被下游观察者订阅了之后,实例化了一个新的观察者MapSingleObserver并保存下游观察者SingleObserver的信息,再去订阅上游SingleJust
这种模式创建了一个装饰类,用来包装原有的类,并在保持类方法签名完整性的前提下,提供了额外的功能的设计模式称为装饰者模式

总结上面的执行顺序:

  1. Rx流的最后一步调用 subscribe启动流(ColdObservable)
  2. 首先执行SingleMap中的subscribeActual方法,其中包括生成新的MapSingleObserver并订阅 SingleJust
  3. 执行SingleJust中的subscribeActual:调用下游MapSingleObserveronSubscribe onSuccess方法
  4. MapSingleObserver中的onSubsribe onSuccess方法也很简单,分别调用下游 ObserveronSubsribe``onSuccess(异常时 onError)方法

observeOn 操作符

Rxjava首先被大家津津乐道之处是可以方便的切换线程,避免Callback Hell,现在来看看线程切换操作符。
我们加入线程切换操作符 observeOn

Disposable disposable = Single.just("wtf")
                 .map(it-> 0)
                 .observeOn(Schedulers.io())
                 .subscribe(it -> Log.i("subscribe", String.of(it)));

同样的,在 observeOn方法中实例化了一个SingleObserveOn

    public final Single<T> observeOn(final Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
    }

继续看SingleObserveOn类中信息

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;
    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> actual;
        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.actual = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                actual.onError(ex);
            } else {
                actual.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}


类似的

  • 构造函数中保存了上游和线程切换的信息
  • subscribeActual 实例化了一个新的观察者ObserveOnSingleObserver

不同的

  • ObserveOnSingleObserver 还继承了AtomicReference<Disposable>、实现了Disposable``Runnable接口
  • onSuccess``onError中都没有直接调用下游的onSuccess onError方法,而是调用了Disposable d = scheduler.scheduleDirect(this);来执行run方法中的逻辑,而run方法中的逻辑则是调用下游的onSuccess onError方法

查看schedulerDirect内部信息

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }

创建了一个对应线程的Worker和一个可用于取消的DisposeTask并执行,对于IoScheduler则是创建了EventLoopWorker,再看看EventLoopWorker中的信息。

    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

EventLoopWorker中则是维护了一套包含相应的线程池、可取消的CompositeDisposable、以及用于运行RunableThreadWorker。总的来说就是一套可以在相应线程运行且可取消的类和逻辑。

  • 上面则解释了为什么observeOn可以切换下游的线程(onSuccess onError)
  • 同样解释了为什么不会改变onSubsribe的调用线程,因为可以看到onSubscribe方法中直接调用了下游的onSucsribe,并没有受到线程切换的影响。

SubscribeOn

实际上,subsribeOn 是 RxJava2.x 中比较复杂也是相较于 RxJava1.x 改动比较大的一个操作符,它甚至会影响流的执行顺序。(可以参见唐雪茂写的 Rxjava流的设计 中的1 2两个流)

我们现在设计两个Rx流

Disposable disposable = Single.just("wtf")
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
                 .subscribe(it -> Log.i("subscribe", 4);
Disposable disposable2 = Single.just("wtf")
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
                 .subscribeOn(Schedulers.io())
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
                 .subscribe(it -> Log.i("subscribe", 4);

你可能已经知道并记住了两个流的打印的顺序分别是 01234 23014,但是为什么doOnSubsribe方法和RxJava1中调用顺序完全不一样,为什么通过subscribeOn切换线程会影响执行顺序?

先找到 SingleSubscribeOn

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;
    final Scheduler scheduler;
    
    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
        //直接调用下游 onSubscribe
        s.onSubscribe(parent);
        //再执行订阅上游的方法
        Disposable f = scheduler.scheduleDirect(parent);
        parent.task.replace(f);
    }

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;
        final SingleObserver<? super T> actual;
        final SequentialDisposable task;
        final SingleSource<? extends T> source;
        
        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.actual = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
              //没有继续调用下游的 onSubscribe 方法
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            actual.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            actual.onError(e);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
            task.dispose();
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public void run() {
            source.subscribe(this);
        }
    }

}

同样的直接看subscribeActual方法及onSubscribe方法,发现事情并没有那么简单,和之前的操作符的逻辑区别很大:

  • SubscribeOnObserver同样还继承了AtomicReference<Disposable>,实现了Disposable``Runnable接口
  • 并没有直接调用subscribe订阅上游,而是执行了其它操作符在 onSubscribe中订阅下游的操作
  • 然后再结合Disposable f = scheduler.scheduleDirect(parent);run方法可以知道在新的线程中执行了订阅上游的操作 source.subscribe(this);
  • onSubsribe中并没有再继续调用下游的 onSubsribe

综合起来可以知道,本来应该在整个流从下至上订阅完成后按照从上至下的顺序执行 onSubscribe的流,在使用subsribeOn操作符的后,在订阅的时(执行subscribeActual),就开始执行下游的onSubscribe且在当前线程!然后才在指定的io线程执行之下而上的操作,这也是为什么subsribeOn影响的是上游的线程。

小结:

我认为实际上 Rx 使用了很多优秀的设计将我们各种常用的操作进行了封装,让我们自由组合使用,其本身并没有用什么黑科技。例如切换线程本质上则是帮我们启用了一个新的线程并把接下来的代码放进去执行。
当然,其中还有很多更深入的内容需要我们继续发现和学习。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345