RxJava分析之订阅过程

最近在用RxJava,虽然正常使用没有问题,但过程中产生了很多疑问,比如Observable和Subscriber是怎么联系到一起的?OnSubscribe又是什么时候起作用的,起什么作用?Subscriber和Subscription有什么关系?unsubscribe之后Observable还在运行吗?等等这些疑问,所以顺着这些问题找找答案,梳理一下源码,给自己以后做个记录就不会忘记啦若有不当欢迎指正,一起进步

根据我的理解说一下以上几个概念

  • Observable - 可被观察者(就是一个可观测的数据源)
  • Subscriber - 订阅者(接收可观测数据源并可以进行处理的观察者)
  • OnSubscribe - 订阅时(订阅者订阅可被观察者时发生的一系列动作)
  • Subscription - 订阅(名词性质,包含了订阅状态和解除订阅的动作)
  • Observe - 观测 (对订阅者的处理流程的规范包含三个动作(onError onNext onComplete))
  • SubscriptionList(维护一个订阅列表)

有了上边的基本了解再来看看源码分析

通常在Android上一个简单的订阅是这样的

/*其中onSubscribeTimerPeriodically是Observable.OnSubscribe类型,就是一个订阅时,这个订阅时做了那些事?简单一点就做一件事,创建一个Subscription类型的工作线程,拿到订阅者并把工作线程添加进订阅者内部维护的SubscriptionList中去,然后异步向订阅者发送数据(后边详细分析)*/
Observable.create(onSubscribeTimerPeriodically)
.subscribe(mySubscriber)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());

mySubscriber = new Subscriber<Long>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Long val) {

            }
        };
  • 上边的代码意思就是创建了一个Observable ,这个数据源被mySubscriber这个订阅者订阅,当subscribe时mySubscriber被当作参数传递给onSubscribeTimerPeriodically的call方法,当然工作线程中产生数据,在主线程中接收数据

下边看看onSubscribeTimerPeriodically这个“订阅时”

/*这是一个带有初始延时的周期的长整数发生器,关于scheduler和底层的线程调度机制后边会再专门写一篇分析文章,现在主要分析订阅的过程*/

public final class OnSubscribeTimerPeriodically implements OnSubscribe<Long> {
    final long initialDelay;
    final long period;
    final TimeUnit unit;
    final Scheduler scheduler;

    public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;
    }

/*child参数就是上边的mySubscriber*/

    @Override
    public void call(final Subscriber<? super Long> child) {

    /*还记得上边说过Subscriber内部维护了一个SubscriptionList,这个worker被add到其中,这样在外部Subscriber就可以控制订阅的状态,比如调用unsubscribe方法解除订阅*/

        final Worker worker = scheduler.createWorker();
        child.add(worker);
        worker.schedulePeriodically(new Action0() {
            long counter;
            @Override
            public void call() {
                try {
                    child.onNext(counter++);
                } catch (Throwable e) {
                    try {
                        worker.unsubscribe();
                    } finally {
                        Exceptions.throwOrReport(e, child);
                    }
                }
            }
        }, initialDelay, period, unit);
    }
}
  • 通过上边的代码可以发现这就是订阅时的作用,同时大概能感觉到,Observable就是一个”架子“,确实是这样的,为了阅读的连续性这个后边展开说,接下来先看看之前我们有一个疑问:unsubscribe之后Observable还在运行(这么说其实是不对的,但是符合刚开始接触RxJava的直觉,先这么写,后边解释)吗?
child.add(worker);

这句告诉我们mySubscriber执行了add()方法添加到内部的订阅列表,实际上是调用了SubscriptionList的add()方法,说明Worker一定实现了Subscription接口,并且要找的答案就在scheduler.createWorker();
其中scheduler是抽象类Scheduler的一个子类的实例,通常有三种实现EventLoopsScheduler CachedThreadScheduler NewThreadScheduler,前两个是线程池实现,后一个是直接创建新的线程,就以EventLoopsScheduler分析其createWorker():

@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get().getEventLoop());
}

参数pool.get().getEventLoop()是

static final class PoolWorker extends NewThreadWorker {
    PoolWorker(ThreadFactory threadFactory) {
        super(threadFactory);
    }
}

因为调用Subscriber.unsubscribe(),会迭代每一个SubscriptionList中的订阅调用其unsubscribe()方法,而上边也看到里面实际是继承自NewThreadWorker 的对象,其解除订阅方法如下:

@Override
public void unsubscribe() {
    isUnsubscribed = true;
    executor.shutdownNow();//马上停掉线程池里所有的线程
    deregisterExecutor(executor);
}

executor是ThreadPoolExecutor类型,很明显疑问解决了并且也可以来说说为什么Obervable是一个“架子”了

  • Observable本身并不提供工作线程,也不管理调度

  • Observable依靠传入的OnSubscribe决定订阅时发生的动作

  • OnSubscribe把工作线程add给Subscriber维护的subscriptions

  • Observable提供了一系列的Operator对Observable进行各种变换过滤等操作

  • 如果现有操作不能满足需要还可以通过lift自己实现特定操作

  • 画了一张图,有一些不准确的地方,但是把订阅过程的逻辑都表达出来了:

image.png

最后说一点,以前老是觉得Observable.subscribe(Subscriber)这种方式怪怪的,被观察者订阅了订阅者,你说怪不怪?正好张孝祥老师的一个视频点醒了我,面向对象的本质就是谁拥有数据谁就有权进行操作,按照这个想法,订阅的方法当然得在被订阅者的身上,这么说来也可以理解了,而且这样更好封装,就像android中draw一个view,这个draw()方法不是外部一个什么对象去调用,而正是view自己调用类似view.draw()

迁移自CSDN
2016年05月23日 20:14:17
http://blog.csdn.net/u013262051/article/details/51481650

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,639评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,277评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,221评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,474评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,570评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,816评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,957评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,718评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,176评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,511评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,646评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,322评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,934评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,755评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,987评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,358评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,514评论 2 348

推荐阅读更多精彩内容