最近在用RxJava,虽然正常使用没有问题,但过程中产生了很多疑问,比如Observable和Subscriber是怎么联系到一起的?OnSubscribe又是什么时候起作用的,起什么作用?Subscriber和Subscription有什么关系?unsubscribe之后Observable还在运行吗?等等这些疑问,所以顺着这些问题找找答案,梳理一下源码,给自己以后做个记录就不会忘记啦若有不当欢迎指正,一起进步
根据我的理解说一下以上几个概念
- Observable - 可被观察者(就是一个可观测的数据源)
- Subscriber - 订阅者(接收可观测数据源并可以进行处理的观察者)
- OnSubscribe - 订阅时(订阅者订阅可被观察者时发生的一系列动作)
- Subscription - 订阅(名词性质,包含了订阅状态和解除订阅的动作)
- Observe - 观测 (对订阅者的处理流程的规范包含三个动作(onError onNext onComplete))
- SubscriptionList(维护一个订阅列表)
有了上边的基本了解再来看看源码分析
通常在Android上一个简单的订阅是这样的
/*其中onSubscribeTimerPeriodically是Observable.OnSubscribe类型,就是一个订阅时,这个订阅时做了那些事?简单一点就做一件事,创建一个Subscription类型的工作线程,拿到订阅者并把工作线程添加进订阅者内部维护的SubscriptionList中去,然后异步向订阅者发送数据(后边详细分析)*/
Observable.create(onSubscribeTimerPeriodically)
.subscribe(mySubscriber)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());
mySubscriber = new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long val) {
}
};
- 上边的代码意思就是创建了一个Observable ,这个数据源被mySubscriber这个订阅者订阅,当subscribe时mySubscriber被当作参数传递给onSubscribeTimerPeriodically的call方法,当然工作线程中产生数据,在主线程中接收数据
下边看看onSubscribeTimerPeriodically这个“订阅时”
/*这是一个带有初始延时的周期的长整数发生器,关于scheduler和底层的线程调度机制后边会再专门写一篇分析文章,现在主要分析订阅的过程*/
public final class OnSubscribeTimerPeriodically implements OnSubscribe<Long> {
final long initialDelay;
final long period;
final TimeUnit unit;
final Scheduler scheduler;
public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
}
/*child参数就是上边的mySubscriber*/
@Override
public void call(final Subscriber<? super Long> child) {
/*还记得上边说过Subscriber内部维护了一个SubscriptionList,这个worker被add到其中,这样在外部Subscriber就可以控制订阅的状态,比如调用unsubscribe方法解除订阅*/
final Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedulePeriodically(new Action0() {
long counter;
@Override
public void call() {
try {
child.onNext(counter++);
} catch (Throwable e) {
try {
worker.unsubscribe();
} finally {
Exceptions.throwOrReport(e, child);
}
}
}
}, initialDelay, period, unit);
}
}
- 通过上边的代码可以发现这就是订阅时的作用,同时大概能感觉到,Observable就是一个”架子“,确实是这样的,为了阅读的连续性这个后边展开说,接下来先看看之前我们有一个疑问:unsubscribe之后Observable还在运行(这么说其实是不对的,但是符合刚开始接触RxJava的直觉,先这么写,后边解释)吗?
child.add(worker);
这句告诉我们mySubscriber执行了add()方法添加到内部的订阅列表,实际上是调用了SubscriptionList的add()方法,说明Worker一定实现了Subscription接口,并且要找的答案就在scheduler.createWorker();
其中scheduler是抽象类Scheduler的一个子类的实例,通常有三种实现EventLoopsScheduler CachedThreadScheduler NewThreadScheduler,前两个是线程池实现,后一个是直接创建新的线程,就以EventLoopsScheduler分析其createWorker():
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}
参数pool.get().getEventLoop()是
static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
}
}
因为调用Subscriber.unsubscribe(),会迭代每一个SubscriptionList中的订阅调用其unsubscribe()方法,而上边也看到里面实际是继承自NewThreadWorker 的对象,其解除订阅方法如下:
@Override
public void unsubscribe() {
isUnsubscribed = true;
executor.shutdownNow();//马上停掉线程池里所有的线程
deregisterExecutor(executor);
}
executor是ThreadPoolExecutor类型,很明显疑问解决了并且也可以来说说为什么Obervable是一个“架子”了
Observable本身并不提供工作线程,也不管理调度
Observable依靠传入的OnSubscribe决定订阅时发生的动作
OnSubscribe把工作线程add给Subscriber维护的subscriptions
Observable提供了一系列的Operator对Observable进行各种变换过滤等操作
如果现有操作不能满足需要还可以通过lift自己实现特定操作
画了一张图,有一些不准确的地方,但是把订阅过程的逻辑都表达出来了:
最后说一点,以前老是觉得Observable.subscribe(Subscriber)这种方式怪怪的,被观察者订阅了订阅者,你说怪不怪?正好张孝祥老师的一个视频点醒了我,面向对象的本质就是谁拥有数据谁就有权进行操作,按照这个想法,订阅的方法当然得在被订阅者的身上,这么说来也可以理解了,而且这样更好封装,就像android中draw一个view,这个draw()方法不是外部一个什么对象去调用,而正是view自己调用类似view.draw()
迁移自CSDN
2016年05月23日 20:14:17
http://blog.csdn.net/u013262051/article/details/51481650