RxJava基础九-与Connectable相关操作符

此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著

3.9 与Connectable Observable相关的操作符

我们前面所学的Observable,他们又一个共同的特性,就是只有当订阅者来订阅时才会开始发送数据,否则什么也不发生,这就是懒加载。那什么是Connectable Observable呢, 它是一种特殊的Observable,并不是在订阅者订阅时才发送数据,而是只要对其应用connect操作符就开始发送数据。

3.9.1 publish 和connect

publish操作符就是用来将一个普通的Observable转化为一个Connectable Observable的。需要注意的是,如果发送数据已经开始了再进行订阅的话,就只能接收以后发送的数据。

connect操作符就是用来触发Connectable Observable发送数据的。应用connect操作符后会返回一个Subscription对象, 通过这个Subscription对象,我们可以调用其unsubscribe方法来终止数据的发送。另外,如果还没有订阅者订阅就应用connect操作符,也是可以使其开始发送数据的。

/**
     * publish && connect
     */
    private void publishConnectTest(){
        Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline());
        final ConnectableObservable<Long> observable = obser.publish();
        final Action1 action2 = new Action1() {
            @Override
            public void call(Object o) {
                log("action2: " + o);
            }
        };
        Action1 action1 = new Action1() {
            @Override
            public void call(Object o) {
                log("action1: " + o);
                if((long)o==3)
                    observable.take(6).subscribe(action2);
            }
        };
        observable.take(6).subscribe(action1);
        observable.connect();
    }
结果:
action1: 0
action1: 1
action1: 2
action1: 3
action1: 4
action2: 4
action1: 5
action2: 5
action2: 6
action2: 7
action2: 8
action2: 9

3.9.2 refCount

refCount 操作符能够将一个Connectable Observable对象再重新转化为一个普通的Observable对象,这时候如果有订阅者进行订阅将会触发数据的发送。

/**
     * refCount
     */
    private void refCountTest(){
        Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline());
        ConnectableObservable<Long> observable = obser.publish();
        observable.refCount().take(5).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                log("refCount: "+ aLong);
            }
        });
    }
订阅后会让Observable立刻开始生产并发送数据
refCount: 0
refCount: 1
refCount: 2
refCount: 3
refCount: 4

3.9.3 replay

replay操作符返回一个Connectable Observable对象并且可以缓存其发送过的数据,这样即使有订阅者在其发送数据之后进行订阅,也能收到其之前发送过的数据。不过使用replay操作符最好还是限定缓存大小, 否则如果缓存的数据太多的话,可会占用很多内存。对缓存的控制可以从空间和时间两个维度来实现。

/**
     * replay
     */
    private ConnectableObservable<Long> replayCountObserver(){
        Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline());
        return obser.replay(2);
    }
    private ConnectableObservable<Long> replayTimeObserver(){
        Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline());
        return obser.replay(3, TimeUnit.SECONDS);
    }
    private void replayTest(){
        final ConnectableObservable<Long> observer = replayCountObserver();
        final Action1 action2 = new Action1() {
            @Override
            public void call(Object o) {
                log("action2: " + o);
            }
        };
        Action1 action1 = new Action1() {
            @Override
            public void call(Object o) {
                log("action1: " + o);
                if((long)o==3)
                    observer.take(6).subscribe(action2);
            }
        };
        observer.take(10).subscribe(action1);
        log("relaycount");
        observer.connect();
    }

这时将会得到如下的结果。Action1在接收到3后吧Action2也订阅上了,由于缓存的空间是2,所以Action2可以接收到之前的两个数据2和3,之后Action1和Action2会共同接收后面的数据

relaycount
action1: 0
action1: 1
action1: 2
action1: 3
action2: 2
action2: 3
action1: 4
action2: 4
action1: 5
action2: 5
action1: 6
action2: 6
action1: 7
action2: 7
action1: 8
action1: 9

下面我们使用时间缓存的Observable来订阅,使用connect操作符后我们得到如下结果。Action1在接收到数据3之后把Action2也订阅上了,Action2收到了之前3秒缓存的所有数据。之后共同接收后面的数据。

timecount
action1: 0
action1: 1
action1: 2
action1: 3
action2: 0
action2: 1
action2: 2
action2: 3
action1: 4
action2: 4
action1: 5
action2: 5
action1: 6
action1: 7
action1: 8
action1: 9
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 224,983评论 6 522
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,354评论 3 403
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,216评论 0 367
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,061评论 1 300
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,073评论 6 400
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,541评论 1 314
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,906评论 3 428
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,881评论 0 279
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,428评论 1 324
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,460评论 3 346
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,578评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,176评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,913评论 3 339
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,348评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,490评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,142评论 3 381
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,650评论 2 366

推荐阅读更多精彩内容