RxJava源码解析(二)—线程调度器Scheduler

在RxJava中,有个很重要的概念叫做"线程调度器"—Scheduler。它用一种隐式的方法屏蔽掉了我们之前通过回调方式的线程调用。我们看个例子:

Observable<String> ob = Observable.just("str1","str2");
ob.map(new Func1<String, String>() {
@Override
public String call(String t) {
System.out.println("function call " + Thread.currentThread());
return "[" + t + "]";
}})
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String t) {
System.out.println("onNext call " + Thread.currentThread());
System.out.println("onNext "+t);
}
});

代码中,我们通过一个字符串生成了一个Observable对象,而这个对象我们又通过一个map映射映射成为一个新的Observable对象(这部分的知识请参照第一章RxJava源码解析(一)从一个例子开始)。在这之后,我们有通过调用observeOn方法设置了一个叫做Schedulers.newThread()的调度器。这个函数的目的是为了告诉你的被观察者,当你的数据返回的时候需要往哪个线程上post你的数据消息,换句话说,也就是你所定义的Subscriber对象的onCompleted/onError/onNext的执行线程。这段代码最后输出:

//output:
function call Thread[main,5,main]//map映射发生在默认线程也就是虚拟机主线程中
function call Thread[main,5,main]//map映射发生在默认线程也就是虚拟机主线程中
onNext call Thread[RxNewThreadScheduler-1,5,main] // 消息回调函数处理在一个新的线程中
onNext [str1]
onNext call Thread[RxNewThreadScheduler-1,5,main] // 消息回调函数处理在一个新的线程中
onNext [str2]

本章,我们将重点关注这个调度器,那么我们首先要思考的问题是,这个调度器将会提供什么功能呢?这就要回头看下我们能用这个调度器干什么了?
首先,我们需要调度器去帮助我们生成一个线程,并且在线程中去执行我们所书写的一些逻辑操作
其次,当我们以后得到了结果,我们还要需要调度器往调度器线程中发送一个消息,以便可以执行订阅者的回调函数
好的,基于我们上面的需求,我们将看下,在RxJava的调度器实现中,是如何实现我们所需要的功能的。
我们先来看下Observable对象所提供的observeOn函数,这个函数有多个函数重载,最终都会调用到三个参数的observeOn方法:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }

这里调用到了RxJava中一个很重要的操作符号lift。lift函数的主要作用是直接对被观察者对象Observable进行数据转换,而这里引入了一个叫做Operator的新类型,在上述的例子中这个类型的实现类是一个叫做OperatorObserveOn的策略指令。我们看下这个lift函数定义:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

我们所传入的Operator对象被包装成为一个OnSubscribeLift对象,OnSubscribeLift对象是我们非常熟悉的OnSubscribe类型的子类。第一章我们说到OnSubscribe提供一种处理订阅者注册订阅后的策略。但这里,的lift函数跟我们之前所看到的map函数有什么不同么?
这里我们可以看到OnSubscribeLift构造的时候并不传入上一次构建的Observable对象,而是直接传入上一个对象的Observable的onSubscribe成员。按照我们上面的例子,我们调用过map函数后调用observeOn函数,此时传入的onSubscribe对应的就是map产生的OnSubscribeMap对象。而参数operator对应observeOn函数中的OperatorObserveOn对象。我们先来看下Operator类的定义:

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

Operator也是一种映射关系函数,转换类型是通过Subscriber<T>->Subscriber<R>。也就是说,Operator是一个直接转化新的Subscriber的映射函数。这样就可以在订阅前拦截订阅操作。比如:

Observable<String> ob = Observable.just("str1","str2");
        ob.map(new Func1<String, String>() {
            @Override
            public String call(String t) {
                System.out.println("function call " + Thread.currentThread());
                return "[" + t + "]";
            }})
        .lift(new Operator<String,String>(){

            @Override
            public Subscriber<String> call(Subscriber<? super String> st) {
                return new Subscriber<String>() {
                    @Override
                    public void onNext(String t) {
                        long startTime = System.currentTimeMillis();
                        System.out.println("onNext begin");
                        st.onNext(t);//用于监控订阅者的执行时间
                        System.out.println("onNext execute on next time = "+(System.currentTimeMillis() - startTime)+"ms");
                    }
                };
        }})
        .subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String t) {
                IO.waitTime(5000);
                System.out.println("call onNext "+t);
            }
        });

我们为了监控订阅者订阅的时候有多少的时间消耗,我们通过lift函数在我们的订阅者外包装了一层Subscriber,这样我们就可以依赖于包装的Subscriber对象进行函数监控:

//output:
function call Thread[main,5,main]
onNext begin//开启监控
call onNext [str1]
onNext execute on next time = 5005ms//监控结束

也就是说,上述的例子中我们的流程图应该是:

lift后流程图

好的,有了上面的概念,我们可以来看下OperatorObserveOn的代码,我们看下它给我们生成了一个什么样的订阅者:

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
     ....
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
  .....
    }
lift函数流程图

Lift函数执行完后,会将我们所注册的Subscriber装饰成为一个ObserveOnSubscriber对象。"lift后流程图"的红色框框部分以后注明了这个对象的功能。我们先来看下ObserveOnSubscriber对象的onCompleted/onError三个方法:

 @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            }
            finished = true;
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                RxJavaHooks.onError(e);
                return;
            }
            error = e;
            finished = true;
            schedule();
        }

由于onCompleted和onError是互斥的,且只会被调用一次,因此会用一个finished的boolean变量来进行拦截,然后调用schedule()函数来处理剩下逻辑:

final AtomicLong counter = new AtomicLong();
 protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
 }

由于counter在调用getAndIncrement()后就大于0,因此recursiveScheduler.schedule(this)只会被调用一次,recursiveScheduler的定义在ObserveOnSubscriber的构造器中:

 public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            ...
        }

scheduler就是我们传入的Schedulers.newThread()对象,实际上是一个NewThreadScheduler对象:

@Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);//recursiveScheduler的类型是一个NewThreadWorker
    }

可以看出,recursiveScheduler最终会被置为NewThreadWorker类型

public NewThreadWorker(ThreadFactory threadFactory) {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
     ....
        executor = exec;
    }

NewThreadWorker构造器中,定义了一个核心线程为1的ScheduledThreadPool线程。ScheduledThreadPool是一个很特殊的线程池,这个线程池的主要是为了支持延迟任务,或者定时任务。recursiveScheduler.schedule(this)实际上就是调用NewThreadWorkerschedule方法。

 @Override
    public Subscription schedule(final Action0 action) {
        return schedule(action, 0, null);
    }

    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        if (isUnsubscribed) {
            return Subscriptions.unsubscribed();
        }
        return scheduleActual(action, delayTime, unit);
    }

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
       ....
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
       ...
    }

schedule方法最终会调用到scheduleActual方法,action对象会被包装成为一个ScheduledActionRunable对象提交给线程池executor。而线程池会调用ScheduledActionrun()方法,在run()方法中,又会调用Action0的call方法:

@Override
    public void run() {
        try {
            .....
            action.call();
        } catch (OnErrorNotImplementedException e) {
              ....
        }
    }

如果刚才的代码已经把你给绕懵了,不要紧,我们再来回顾一下流程:

1. 我们通过lift函数注册了一个叫做OperatorObserveOnOperator对象

2. lift函数会构造一个叫做OnSubscribeLift的对象用于构造一个Observable对象

3. 当订阅者Subscriber对象订阅Observable的时候,根据调用链,会优先使用OnSubscribeLift对象作为优先处理对象。

4 OnSubscribeLift调用call(Subscriber)方法,在该类的call方法中,会通过内部的Operator对象(也就是OperatorObserveOn对象)的Subscriber call(Subscriber)方法,生成一个新的订阅者ObserveOnSubscriber

5. 新的订阅者对象ObserveOnSubscriber被OnSubscribeLift对象传递给上层的OnSubscribe对象处理,也就是走如RxJava源码解析(一)从一个例子开始)中的流程,最后会走到OnSubscribeFromArray对象中,然后遍历里面的数组生产者

6. OnSubscribeFromArray遍历数组中的成员,然后调用订阅者的onNextonCompleted。而最终要调用到的订阅者就是ObserveOnSubscriber对象。

7. ObserveOnSubscriber对象的onNext()onCompleted()方法会触发执行schedule()方法,schedule()方法会调用Scheduler.Worker.schedule(Action0)方法,而这个Action0对象就是ObserveOnSubscriber类型

8. 当我们选择 Schedulers.newThread()调度器的时候,Scheduler.Worker对象实际类型为NewThreadWorker对象,而NewThreadWorker.schedule(Action0)中会将Action0对象包装成为ScheduledAction对象,ScheduledAction本质是一个Runnable类型,因此它可以被提交到线程池中,调用ScheduledAction.run()方法,而ScheduledAction.run()方法中,又会调用Action0.call()

9. 步骤8中Action0实现的类型为ObserveOnSubscriber类型,此时调用ObserveOnSubscriber.call()方法会从queue队列中读取onNext参数值并检测是否已经结束,注意,由于当前函数是由我们调度器生成的Worker对象中的线程池调用的,因此当前的全部回调操作都发生在Worker所构建的线程中。

总结

实际上,我们从上面可以看出,我们通过lift函数所构造出来的ObserveOnSubscriber对象,实际上是生成了一个OnSubscriber的装饰对象。而这个对象的具体操作,都被封装到了call()方法中去,换句话说,我们的调度器实际上就是提供一个容器,给我们的call()方法提供上下文。基于我们上述的结论,我们实际上就可以写出我们自己的调度器:

private static class SchedulerImpl extends Scheduler {
        @Override
        public Worker createWorker() {
            // TODO Auto-generated method stub
            return new WorkerImpl();
        }
    }

private static class WorkerImpl extends Scheduler.Worker {
        @Override
        public void unsubscribe() {}
        @Override
        public boolean isUnsubscribed() {
                        return false;
        }

        @Override
        public Subscription schedule(Action0 action) {
            return schedule(action,0,null);
        }
        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            Thread thread = new Thread() {
                public void run() {
                    action.call();
                };
            };
            thread.setName("test");
            thread.start();
            return null;
        }
    }

这个调度器的写法非常的简单:
1.我们先构建一个Scheduler用于管理我们的Worker
2.observeOn会给我们提供一个ObserveOnSubscriber类型的Action0对象,作为参数调用Worker.schedule(Action0 action)方法
3.我们生成了一个独立的线程"test",并在线程中调用Action0.call ()方法,这样就可以将事件发送到我们所订阅的真正的Subscriber上了

最后输出日志:

output:
call onNext [str1]
call onNext [str2]
call onCompleted Thread[test,5,main]

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,082评论 5 464
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,231评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 145,047评论 0 327
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,977评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,893评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,014评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,976评论 3 388
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,605评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,888评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,906评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,732评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,513评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,980评论 3 301
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,132评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,447评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,027评论 2 343
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,232评论 2 339

推荐阅读更多精彩内容