RxJava2 实战系列文章
RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通知 UI 更新
RxJava2 实战知识梳理(2) - 计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) - 优化搜索联想功能
RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯
RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
RxJava2 实战知识梳理(6) - 基于错误类型的重试请求
RxJava2 实战知识梳理(7) - 基于 combineLatest 实现的输入表单验证
RxJava2 实战知识梳理(8) - 使用 publish + merge 优化先加载缓存,再读取网络数据的请求过程
RxJava2 实战知识梳理(9) - 使用 timer/interval/delay 实现任务调度
RxJava2 实战知识梳理(10) - 屏幕旋转导致 Activity 重建时恢复任务
RxJava2 实战知识梳理(11) - 检测网络状态并自动重试请求
RxJava2 实战知识梳理(12) - 实战讲解 publish & replay & share & refCount & autoConnect
RxJava2 实战知识梳理(13) - 如何使得错误发生时不自动停止订阅关系
RxJava2 实战知识梳理(14) - 在 token 过期时,刷新过期 token 并重新发起请求
RxJava2 实战知识梳理(15) - 实现一个简单的 MVP + RxJava + Retrofit 应用
一、前言
今天,我们来整理以下几个大家容易弄混的概念,并用实际例子来演示,可以从 RxSample 的第十二章中获取:
publish
reply
ConnectableObservable
connect
share
refCount
autoConnect
对于以上这些概念,可以用一幅图来概括:
从图中可以看出,这里面可以供使用者订阅的
Observable
可以分为四类,下面我们将逐一介绍这几种Observable
的特点:
- 第一类:
Cold Observable
,就是我们通过Observable.create
、Observable.interval
等创建型操作符生成的Observable
。 - 第二类:由
Cold Observable
经过publish()
或者replay(int N)
操作符转换成的ConnectableObservable
。 - 第三类:由
ConnectableObservable
经过refCount()
,或者由Cold Observable
经过share()
转换成的Observable
。 - 第四类:由
ConnectableObservable
经过autoConnect(int N)
转换成的Observable
。
二、Cold Observable
Cold Observable
就是我们通过Observable.create
、Observable.interval
等创建型操作符生成的Observable
,它具有以下几个特点:
- 当一个订阅者订阅
Cold Observable
时,Cold Observable
会重新开始发射数据给该订阅者。 - 当多个订阅者订阅到同一个
Cold Observable
,它们收到的数据是相互独立的。 - 当一个订阅者取消订阅
Cold Observable
后,Cold Observable
会停止发射数据给该订阅者,但不会停止发射数据给其它订阅者。
下面,我们演示一个例子,首先我们创建一个Cold Observable
:
//直接订阅Cold Observable。
private void createColdSource() {
mConvertObservable = getSource();
}
private Observable<Integer> getSource() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
try {
int i = 0;
while (true) {
Log.d(TAG, "源被订阅者发射数据=" + i + ",发送线程ID=" + Thread.currentThread().getId());
mSourceOut.add(i);
observableEmitter.onNext(i++);
updateMessage();
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io());
}
在创建两个订阅者,它们可以随时订阅到Cold Observable
或者取消对它的订阅:
private void startSubscribe1() {
if (mConvertObservable != null && mDisposable1 == null) {
mDisposable1 = mConvertObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "订阅者1收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
mSubscribe1In.add(integer);
updateMessage();
}
});
}
}
private void disposeSubscribe1() {
if (mDisposable1 != null) {
mDisposable1.dispose();
mDisposable1 = null;
mSubscribe1In.clear();
updateMessage();
}
}
private void startSubscribe2() {
if (mConvertObservable != null && mDisposable2 == null) {
mDisposable2 = mConvertObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "订阅者2收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
mSubscribe2In.add(integer);
updateMessage();
}
});
}
}
private void disposeSubscribe2() {
if (mDisposable2 != null) {
mDisposable2.dispose();
mDisposable2 = null;
mSubscribe2In.clear();
updateMessage();
}
}
为了验证之前说到的几个特点,进入程序之后,我们会先创建该Cold Observable
,之后进行一系列的操作,效果如下:
在上面的图中,我们做了一下几步操作:
- 第一步:启动应用,创建
Cold Observable
,这时候Cold Observable
没有发送任何数据。 - 第二步:
Observer1
订阅Observable
,此时Cold Observable
开始发送数据,Observer1
也可以收到数据,即 一个订阅者订阅 Cold Observable 时, Cold Observable 会开始发射数据给该订阅者 - 第三步:
Observer2
订阅Observable
,此时Observable2
也可以收到数据,但是它和Observable1
收到的数据是相互独立的,即 当多个订阅者订阅到同一个 Cold Observable ,它们收到的数据是相互独立的。 - 第四步:
Observer1
取消对Observable
的订阅,这时候Observer1
收不到数据,并且Observable
也不会发射数据给它,但是仍然会发射数据给Observer2
,即 当一个订阅者取消订阅 Cold Observable 后,Cold Observable 会停止发射数据给该订阅者,但不会停止发射数据给其它订阅者。 - 第五步:
Observer1
重新订阅Observable
,这时候Observable
从0
开始发射数据给Observer1
,即 一个订阅者订阅 Cold Observable 时, Cold Observable 会重新开始发射数据给该订阅者。
三、由 Cold Observable 转换的 ConnectableObservable
在了解完Cold Observable
之后,我们再来看第二类的Observable
,它的类型为ConnectableObservable
,它是通过Cold Observable
经过下面两种方式生成的:
.publish()
.reply(int N)
如果使用.publish()
创建,那么订阅者只能收到在订阅之后Cold Observable
发出的数据,而如果使用reply(int N)
创建,那么订阅者在订阅后可以收到Cold Observable
在订阅之前发送的N
个数据。
我们先以publish()
为例,介绍ConnectableObservable
的几个特点:
- 无论
ConnectableObservable
有没有订阅者,只要调用了ConnectableObservable
的connect
方法,Cold Observable
就开始发送数据。 -
connect
会返回一个Disposable
对象,调用了该对象的dispose
方法,Cold Observable
将会停止发送数据,所有ConnectableObservable
的订阅者也无法收到数据。 - 在调用
connect
返回的Disposable
对象后,如果重新调用了connect
方法,那么Cold Observable
会重新发送数据。 - 当一个订阅者订阅到
ConnectableObservable
后,该订阅者会收到在订阅之后,Cold Observable
发送给ConnectableObservable
的数据。 - 当多个订阅者订阅到同一个
ConnectableObservable
时,它们收到的数据是相同的。 - 当一个订阅者取消对
ConnectableObservable
,不会影响其他订阅者收到消息。
下面,我们创建一个ConnectableObservable
,两个订阅者之后会订阅到它,而不是Cold Observable
:
//.publish()将源Observable转换成为HotObservable,当调用它的connect方法后,无论此时有没有订阅者,源Observable都开始发送数据,订阅者订阅后将可以收到数据,并且订阅者解除订阅不会影响源Observable数据的发射。
public void createPublishSource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.publish();
mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
}
和上面一样,还是用一个例子来演示,该例子的效果为:
- 第一步:启动应用,通过
Cold Observable
的publish
方法创建ConnectableObservable
,并调用ConnectableObservable
的connect
方法,可以看到,此时虽然ConnectableObservable
没有任何订阅者,但是Cold Observable
也已经开始发送数据。 - 第二步:
Observer1
订阅到ConnectableObservable
,此时它只能收到订阅之后Cold Observable
发射的数据。 - 第三步:
Observer2
订阅到ConnectableObservable
,Cold Observable
只会发射一份数据,并且Observer1
和Observer2
收到的数据是相同的。 - 第三步:
Observer1
取消对ConnectableObservable
的订阅,Cold Observable
仍然会发射数据,Observer2
仍然可以收到Cold Observable
发射的数据。 - 第四步:
Observer1
重新订阅ConnectableObservable
,和第三步相同,它仍然只会收到订阅之后Cold Observable
发射的数据。 - 第五步:通过
connect
返回的Disposable
对象,调用dispose
方法,此时Cold Observable
停止发射数据,并且Observer1
和Observer2
都收不到数据。
上面这些现象发生的根本原因在于:现在Observer
和Observer2
都是订阅到ConnectableObservable
,真正产生数据的Cold Observable
并不知道他们的存在,和它交互的是ConnectableObservable
,ConnectableObservable
相当于一个中介,它完成下面两项任务:
- 对于上游:通过
connect
和dispose
方法决定是否要订阅到Cold Observer
,也就是决定了Cold Observable
是否发送数据。 - 对于下游:将
Cold Observable
发送的数据转交给它的订阅者。
四、由 ConnectableObservable 转换成 Observable
由ConnectableObservable
转换成Observable
有两种方法,我们分为两节介绍下当订阅到转换后的Observable
时的现象:
.refCount()
.autoConnect(int N)
4.1 ConnectableObservable 由 refCount 转换成 Observable
经过refCount
方法,ConnectableObservable
可以转换成正常的Observable
,我们称为refObservable
,这里我们假设ConnectableObservable
是由Cold Observable
通过publish()
方法转换的,对于它的订阅者,有以下几个特点:
- 第一个订阅者订阅到
refObservable
后,Cold Observable
开始发送数据。 - 之后的订阅者订阅到
refObservable
后,只能收到在订阅之后Cold Observable
发送的数据。 - 如果一个订阅者取消订阅到
refObservable
后,假如它是当前refObservable
的唯一一个订阅者,那么Cold Observable
会停止发送数据;否则,Cold Observable
仍然会继续发送数据,其它的订阅者仍然可以收到Cold Observable
发送的数据。
接着上例子,我们创建一个refObservable
:
//.share()相当于.publish().refCount(),当有订阅者订阅时,源订阅者会开始发送数据,如果所有的订阅者都取消订阅,源Observable就会停止发送数据。
private void createShareSource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.publish().refCount();
}
示例如下:
操作分为以下几步:
- 第一步:通过
.publish().refCount()
创建由ConnectableObservable
转换后的refObservable
,此时Cold Observable
没有发送任何消息。 - 第二步:
Observer1
订阅到refObservable
,Cold Observable
开始发送数据,Observer1
接收数据。 - 第三步:
Observer2
订阅到refObservable
,它只能收到在订阅之后Cold Observable
发送的数据。 - 第四步:
Observer1
取消订阅,Cold Observable
继续发送数据,Observer2
仍然能收到数据。 - 第五步:
Observer2
取消订阅,Cold Observable
停止发送数据。 - 第六步:
Observer1
重新订阅,Cold Observable
重新开始发送数据。
最后说明一点:订阅到Cold Observable
的.publish().refCount()
和Cold Observable
的share()
所返回的Observable
是等价的。
4.2 ConnectableObservable 由 autoConnect(int N) 转换成 Observable
autoConnect(int N)
和refCount
很类似,都是将ConnectableObservable
转换成普通的Observable
,我们称为autoObservable
,同样我们先假设ConnectableObservable
是由Cold Observable
通过publish()
方法生成的,它有以下几个特点:
- 当有
N
个订阅者订阅到refObservable
后,Cold Observable
开始发送数据。 - 之后的订阅者订阅到
refObservable
后,只能收到在订阅之后Cold Observable
发送的数据。 - 只要
Cold Observable
开始发送数据,即使所有的autoObservable
的订阅和都取消了订阅,Cold Observable
也不会停止发送数据,如果想要Cold Observable
停止发送数据,那么可以使用autoConnect(int numberOfSubscribers, Consumer connection)
中Consumer
返回的Disposable
,它的作用和ConnectableObservable
的connect
方法返回的Disposable
相同。
其创建方法如下所示:
//.autoConnect在有指定个订阅者时开始让源Observable发送消息,但是订阅者是否取消订阅不会影响到源Observable的发射。
private void createAutoConnectSource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.publish().autoConnect(1, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
mConvertDisposable = disposable;
}
});
}
示例效果如下:
我们进行了如下几步操作:
- 第一步:启动应用,创建
autoConnect
转换后的autoObservable
。 - 第二步:
Observer1
订阅到autoObservable
,此时满足条件,Cold Observable
开始发送数据。 - 第三步:
Observer2
订阅到autoObservable
,它只能收到订阅后发生的数据。 - 第四步:
Observer1
取消订阅,Cold Observable
继续发送数据,Observer2
仍然可以收到数据。 - 第五步:
Observer2
取消订阅,Cold Observable
仍然继续发送数据。 - 第六步:
Observer2
订阅到autoObservable
,它只能收到订阅后发送的消息了。 - 第七步:调用
mConvertDisposable
的dispose
,Cold Observable
停止发送数据。
五、publish 和 reply(int N) 的区别
在上面的例子当中,所有总结的特点都是建立在ConnectableObservable
是由publish()
生成,只所以这么做,是为了方便大家理解,无论是订阅到ConnectableObservable
,还是由ConnectableObservable
转换的refObservable
和autoObservable
,使用这两种方式创建的唯一区别就是,订阅者在订阅后,如果是通过publish()
创建的,那么订阅者之后收到订阅后Cold Observable
发送的数据;而如果是reply(int N)
创建的,那么订阅者还能额外收到N
个之前Cold Observable
发送的数据,我们用下面一个小例子来演示,订阅者订阅到的Observable
如下:
//.reply会让缓存源Observable的N个数据项,当有新的订阅者订阅时,它会发送这N个数据项给它。
private void createReplySource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.replay(3);
mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
}
示例演示效果:
操作步骤:
- 第一步:启动应用,通过
Cold Observable
的replay(3)
方法创建ConnectableObservable
,可以看到,此时虽然ConnectableObservable
没有任何订阅者,但是Cold Observable
也已经开始发送数据。 - 第二步:
Observer1
订阅到ConnectableObservable
,此时它会先收到之前发射的3
个数据,之后收到订阅之后Cold Observable
发射的数据。
最后再提一下,更详细的代码大家可以从 RxSample 的第十二章中获取。
更多文章,欢迎访问我的 Android 知识梳理系列:
- Android 知识梳理目录://www.greatytc.com/p/fd82d18994ce
- 个人主页:http://lizejun.cn
- 个人知识总结目录:http://lizejun.cn/categories/