此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著
3.9 与Connectable Observable相关的操作符
我们前面所学的Observable,他们又一个共同的特性,就是只有当订阅者来订阅时才会开始发送数据,否则什么也不发生,这就是懒加载。那什么是Connectable Observable呢, 它是一种特殊的Observable,并不是在订阅者订阅时才发送数据,而是只要对其应用connect操作符就开始发送数据。
3.9.1 publish 和connect
publish操作符就是用来将一个普通的Observable转化为一个Connectable Observable的。需要注意的是,如果发送数据已经开始了再进行订阅的话,就只能接收以后发送的数据。
connect操作符就是用来触发Connectable Observable发送数据的。应用connect操作符后会返回一个Subscription对象, 通过这个Subscription对象,我们可以调用其unsubscribe方法来终止数据的发送。另外,如果还没有订阅者订阅就应用connect操作符,也是可以使其开始发送数据的。
/**
* publish && connect
*/
private void publishConnectTest(){
Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline());
final ConnectableObservable<Long> observable = obser.publish();
final Action1 action2 = new Action1() {
@Override
public void call(Object o) {
log("action2: " + o);
}
};
Action1 action1 = new Action1() {
@Override
public void call(Object o) {
log("action1: " + o);
if((long)o==3)
observable.take(6).subscribe(action2);
}
};
observable.take(6).subscribe(action1);
observable.connect();
}
结果:
action1: 0
action1: 1
action1: 2
action1: 3
action1: 4
action2: 4
action1: 5
action2: 5
action2: 6
action2: 7
action2: 8
action2: 9
3.9.2 refCount
refCount 操作符能够将一个Connectable Observable对象再重新转化为一个普通的Observable对象,这时候如果有订阅者进行订阅将会触发数据的发送。
/**
* refCount
*/
private void refCountTest(){
Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline());
ConnectableObservable<Long> observable = obser.publish();
observable.refCount().take(5).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("refCount: "+ aLong);
}
});
}
订阅后会让Observable立刻开始生产并发送数据
refCount: 0
refCount: 1
refCount: 2
refCount: 3
refCount: 4
3.9.3 replay
replay操作符返回一个Connectable Observable对象并且可以缓存其发送过的数据,这样即使有订阅者在其发送数据之后进行订阅,也能收到其之前发送过的数据。不过使用replay操作符最好还是限定缓存大小, 否则如果缓存的数据太多的话,可会占用很多内存。对缓存的控制可以从空间和时间两个维度来实现。
/**
* replay
*/
private ConnectableObservable<Long> replayCountObserver(){
Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline());
return obser.replay(2);
}
private ConnectableObservable<Long> replayTimeObserver(){
Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline());
return obser.replay(3, TimeUnit.SECONDS);
}
private void replayTest(){
final ConnectableObservable<Long> observer = replayCountObserver();
final Action1 action2 = new Action1() {
@Override
public void call(Object o) {
log("action2: " + o);
}
};
Action1 action1 = new Action1() {
@Override
public void call(Object o) {
log("action1: " + o);
if((long)o==3)
observer.take(6).subscribe(action2);
}
};
observer.take(10).subscribe(action1);
log("relaycount");
observer.connect();
}
这时将会得到如下的结果。Action1在接收到3后吧Action2也订阅上了,由于缓存的空间是2,所以Action2可以接收到之前的两个数据2和3,之后Action1和Action2会共同接收后面的数据
relaycount
action1: 0
action1: 1
action1: 2
action1: 3
action2: 2
action2: 3
action1: 4
action2: 4
action1: 5
action2: 5
action1: 6
action2: 6
action1: 7
action2: 7
action1: 8
action1: 9
下面我们使用时间缓存的Observable来订阅,使用connect操作符后我们得到如下结果。Action1在接收到数据3之后把Action2也订阅上了,Action2收到了之前3秒缓存的所有数据。之后共同接收后面的数据。
timecount
action1: 0
action1: 1
action1: 2
action1: 3
action2: 0
action2: 1
action2: 2
action2: 3
action1: 4
action2: 4
action1: 5
action2: 5
action1: 6
action1: 7
action1: 8
action1: 9