RxJava2 源码分析(二)

概述

上一节我们分析了最简单的Rxjava的例子,了解了Rxjava是如何创建事件源,如何发射事件,何时发射事件,也清楚了上游和下游是如何关联起来的。
这一节我们着重来分析下Rxjava强大的线程调度是如何实现的。

简单的例子

private void doSomeWork() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                Log.i("lx", " subscribe: " + Thread.currentThread().getName());
                Thread.sleep(2000);
                e.onNext("a");
                e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
            }
            @Override
            public void onNext(String str) {
                Log.i("lx", " onNext: " + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable e) {
                Log.i("lx", " onError: " + Thread.currentThread().getName());
            }
            @Override
            public void onComplete() {
                Log.i("lx", " onComplete: " + Thread.currentThread().getName());
            }
        });
    }

运行结果:

com.rxjava2.android.samples I/lx:  onSubscribe: main
com.rxjava2.android.samples I/lx:  subscribe: main
com.rxjava2.android.samples I/lx:  onNext: main
com.rxjava2.android.samples I/lx:  onComplete: main

因为此方法笔者是在main线程中调用的,所以没有进行线程调度的情况下,所有方法都运行在main线程中。但我们知道Android的UI线程是不能做网络操作,也不能做耗时操作,所以一般我们把网络或耗时操作都放在非UI线程中执行。接下来我们就来感受下Rxjava强大的线程调度能力。

private void doSomeWork() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                Log.i("lx", " subscribe: " + Thread.currentThread().getName());
                Thread.sleep(2000);
                e.onNext("a");
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io()) //增加了这一句
          .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
            }
            @Override
            public void onNext(String str) {
                Log.i("lx", " onNext: " + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable e) {
                Log.i("lx", " onError: " + Thread.currentThread().getName());
            }
            @Override
            public void onComplete() {
                Log.i("lx", " onComplete: " + Thread.currentThread().getName());
            }
        });
    }

运行结果:

com.rxjava2.android.samples I/lx:  onSubscribe: main
com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx:  onNext: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx:  onComplete: RxCachedThreadScheduler-1

只增加了subscribeOn这一句代码, 就发生如此神奇的现象,除了onSubscribe方法还运行在main线程(订阅发生的线程)其它方法全部都运行在一个名为RxCachedThreadScheduler-1的线程中。我们来看看rxjava是怎么完成这个线程调度的。

线程调度subscribeOn

首先我们先分析下Schedulers.io()这个东东。

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO); // hook function
        // 等价于
        return IO;
    }

再看看IO是什么, IO是个static变量,初始化的地方是

IO = RxJavaPlugins.initIoScheduler(new IOTask()); // 又是hook function
// 等价于
IO = callRequireNonNull(new IOTask());
// 等价于
IO = new IOTask().call();

继续看看IOTask

static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
            // 等价于
            return new IoScheduler();
        }
    }

代码层次很深,为了便于记忆,我们再回顾一下:

Schedulers.io()等价于 new IoScheduler()

    // Schedulers.io()等价于
    @NonNull
    public static Scheduler io() {
        return new IoScheduler();
    }

好了,排除了其他干扰代码,接下来看看IoScheduler()是什么东东了
IoScheduler看名称就知道是个IO线程调度器,根据代码注释得知,它就是一个用来创建和缓存线程的线程池。看到这个豁然开朗了,原来Rxjava就是通过这个调度器来调度线程的,至于具体怎么实现我们接着往下看

    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
    
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    
    CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

从上面的代码可以看出,new IoScheduler()后Rxjava会创建CachedWorkerPool的线程池,同时也创建并运行了一个名为RxCachedWorkerPoolEvictor的清除线程,主要作用是清除不再使用的一些线程。

但目前只创建了线程池并没有实际的thread,所以Schedulers.io()相当于只做了线程调度的前期准备。

OK,终于可以开始分析Rxjava是如何实现线程调度的。回到Demo来看subscribeOn()方法的内部实现:

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

很熟悉的代码RxJavaPlugins.onAssembly,上一篇已经分析过这个方法,就是个hook function, 等价于直接return new ObservableSubscribeOn<T>(this, scheduler);, 现在知道了这里的scheduler其实就是IoScheduler。

跟踪代码进入ObservableSubscribeOn
可以看到这个ObservableSubscribeOn 继承自Observable,并且扩展了一些属性,增加了scheduler。 各位看官,这不就是典型的装饰模式嘛,Rxjava中大量用到了装饰模式,后面还会经常看到这种wrap类。

上篇文章我们已经知道了Observable.subscribe()方法最终都是调用了对应的实现类的subscribeActual方法。我们重点分析下subscribeActual:

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        // 没有任何线程调度,直接调用的,所以下游的onSubscribe方法没有切换线程, 
        //本文demo中下游就是观察者,所以我们明白了为什么只有onSubscribe还运行在main线程
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

SubscribeOnObserver也是装饰模式的体现, 是对下游observer的一个wrap,只是添加了Disposable的管理。

接下来分析最重要的scheduler.scheduleDirect(new SubscribeTask(parent))

// 这个类很简单,就是一个Runnable,最终运行上游的subscribe方法
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // IoSchedular 中的createWorker()
        final Worker w = createWorker();
        // hook decoratedRun=run;
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        // decoratedRun的wrap,增加了Dispose的管理
        DisposeTask task = new DisposeTask(decoratedRun, w);
        // 线程调度
        w.schedule(task, delay, unit);

        return task;
    }

回到IoSchedular

    public Worker createWorker() {
        // 工作线程是在此时创建的
        return new EventLoopWorker(pool.get());
    }
    
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            // action 中就包含上游subscribe的runnable
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    

最终线程是在这个方法内调度并执行的。

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        // decoratedRun = run, 包含上游subscribe方法的runnable
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        // decoratedRun的wrap,增加了dispose的管理
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        // 最终decoratedRun被调度到之前创建或从线程池中取出的线程,
        // 也就是说在RxCachedThreadScheduler-x运行
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

至此我们终于明白了Rxjava是如何调度线程并执行的,通过subscribeOn方法将上游生产事件的方法运行在指定的调度线程中。

com.rxjava2.android.samples I/lx:  onSubscribe: main
com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx:  onNext: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx:  onComplete: RxCachedThreadScheduler-1

从上面的运行结果来看,因为上游生产者已被调度到RxCachedThreadScheduler-1线程中,同时发射事件并没有切换线程,所以发射后消费事件的onNext onErro onComplete也在RxCachedThreadScheduler-1线程中。

总结

  1. Schedulers.io()等价于 new IoScheduler()
  2. new IoScheduler() Rxjava创建了线程池,为后续创建线程做准备,同时创建并运行了一个清理线程RxCachedWorkerPoolEvictor,定期执行清理任务。
  3. subscribeOn()返回一个ObservableSubscribeOn对象,它是Observable的一个装饰类,增加了scheduler
  4. 调用subscribe()方法,在这个方法调用后,subscribeActual()被调用,才真正执行了IoSchduler中的createWorker()创建线程并运行,最终将上游Observablesubscribe()方法调度到新创建的线程中运行。

现在我们知道了被观察者(事件上游)执行线程是如何被调度到指定线程中执行的,但很多情况下,我们希望观察者(事件下游)处理事件最好在UI线程执行,比如更新UI操作等。但下游何时调度,如何调度由于篇幅问题,将放到下节继续分析。敬请期待。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容