RxJava (源码待续)学习之组合操作符

上一篇:RxJava 源码学习之过滤操作符

今天我们继续学习RxJava的组合操作符。

StartWith

  • 作用分析

StartWith操作符可以在发射一个Observable的数据之前先发射一个指定的数据序列。
操作符

Paste_Image.png

  • 示例代码

可接受一个Iterable或者多个Observable作为函数的参数。
Javadoc: startWith(Iterable)
Javadoc: startWith(T) (最多接受九个参数)

//测试代码
Integer[] nums = {1,2,3,4};
Observable.from(nums)
        .startWith(9,8)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {System.out.println("Next: " + item);            }
            @Override
            public void onError(Throwable error) {  System.err.println("Error: " + error.getMessage()); }
            @Override
            public void onCompleted() { System.out.println("Sequence complete."); }
});
//###########################################
输出结果:
Next: 9
Next: 8
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

Merge

  • 作用分析

merge可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。merge可能会让合并的Observables发射的数据交错(有一个类似的操作符 Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。
注意:如果传递给merge的任何一个的Observable发射了onError通知终止了,merge操作符生成的Observable也会立即以onError通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用mergeDelayError

  • 示例代码

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler); // subscribeOn 切换线程
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
//###########################################
输出结果:
Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

ZIP

  • 作用分析

Zip操作符返回一个Obversable,它使用特定函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
下图:把 shape(形状)、size(尺寸) 和 color(颜色) 合并后,发射出来。

Paste_Image.png

  • 示例代码

Observable.zip(
        Observable.just("a1","a2","a3","a4","a5"),
        Observable.just(1,2,3,4),
        Observable.just("b1","b2","b3","b4","b5","b6"),
        new Func3<String,Integer,String,String>(){
            @Override
            public String call(String s, Integer integer, String s2) {
                return s+"_"+integer+"_"+s2;
            }
        }).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {        System.out.println("onCompleted.");    }
    @Override
    public void onError(Throwable e) {        System.out.println("onError: " + e.getMessage());    }
    @Override
    public void onNext(String s) {        System.out.println("onNext: " + s);    }});
//############################################
输出结果:
onNext: a1_1_b1
onNext: a2_2_b2
onNext: a3_3_b3
onNext: a4_4_b4
onCompleted.

Join

  • 作用分析

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
比如: ObservableA 在 5s内发射一条数据 dataA1, ObservableB 这时刚好也在发射数据dataB1,就把ObservableA 的数据dataA1和 ObservableB的数据dataB1合并一起发射;5s还没结束,ObservableB又发射数据dataB2,就把ObservableA 的数据dataA1和 ObservableB的数据dataB2合并一起发射。

Paste_Image.png
  • 示例代码

Javadoc: Join(Observable,Func1,Func1,Func2)

  • 第二个Observable和源Observable结合。
  • Func1参数:在指定的由时间窗口定义时间间隔内,源Observable发射的数据和从第二个Observable发射的数据相互配合返回的Observable。
  • Func1参数:在指定的由时间窗口定义时间间隔内,第二个Observable发射的数据和从源Observable发射的数据相互配合返回的Observable。
  • Func2参数:定义已发射的数据如何与新发射的数据项相结合。
Observable<Integer> create1 = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 6; i++) {
            subscriber.onNext(i);
            try {
                Thread.sleep(600);
            } catch (InterruptedException e) {
                subscriber.onError(e);
            }
        }
    }}).subscribeOn(Schedulers.newThread());
Observable<String> create2 = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        for (int i = 0; i < 4; i++) {
            subscriber.onNext("hello_"+ i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                subscriber.onError(e);
            }
        }
    }}).subscribeOn(Schedulers.newThread());
create1.join(create2,
        new Func1<Integer, Observable<Long>>() {
            @Override
            public Observable<Long> call(Integer integer) {
                return Observable.timer(1000, TimeUnit.MILLISECONDS);
            }
        },
        new Func1<String, Observable<Long>>() {
            @Override
            public Observable<Long> call(String s) {
                return Observable.timer(1000, TimeUnit.MILLISECONDS);
            }
        },
        new Func2<Integer, String, String>() {
            @Override
            public String call(Integer integer1, String s) {
                return integer1 + "-" + s;
            }
        }).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {        System.out.println("onCompleted.");    }
    @Override
    public void onError(Throwable e) {        System.out.println("onError: " + e.getMessage());    }
    @Override
    public void onNext(String s) {        System.out.println("onNext: " + s);    }});
//##############################################
输出结果:
onNext: 0-hello_0
onNext: 1-hello_0
onNext: 1-hello_1
onNext: 2-hello_1
onNext: 3-hello_1
onNext: 3-hello_2
onNext: 2-hello_2
onNext: 4-hello_2
onNext: 4-hello_3
onNext: 5-hello_3

Switch

有这样一个复杂的场景就是在一个subscribe-unsubscribe的序列里我们能够从一个Observable自动取消订阅来订阅一个新的Observable。

RxJava的switch(),正如定义的,将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。

给出一个发射多个Observables序列的源Observable,switch()订阅到源Observable然后开始发射由第一个发射的Observable发射的一样的数据。当源Observable发射一个新的Observable时,switch()立即取消订阅前一个发射数据的Observable(因此打断了从它那里发射的数据流)然后订阅一个新的Observable,并开始发射它的数据。

Paste_Image.png

结束语

ok,RxJava之组合操作符已经学习完啦,当然这里都是分析一些常用的,想了解更多的操作符就去看RxJava官方文档吧。

下一篇:RxJava 源码学习之调度器Scheduler。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 210,914评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,935评论 2 383
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,531评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,309评论 1 282
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,381评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,730评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,882评论 3 404
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,643评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,095评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,448评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,566评论 1 339
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,253评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,829评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,715评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,945评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,248评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,440评论 2 348

推荐阅读更多精彩内容

  • 创建unfaseCreate(create)创建一个Observable(被观察者),当被观察者(Observer...
    chuwe1阅读 6,998评论 3 8
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,186评论 2 8
  • 版权声明:本文为小斑马伟原创文章,转载请注明出处! 上篇简单的阐述了响应式编程的基本理论。这篇主要对响应编程进行详...
    ZebraWei阅读 2,246评论 0 2
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,621评论 8 93
  • 下方展示了几种创建Observable的方法 just() ---将一个或者多个对象转换成发射这个或这些对象的一个...
    菜鸟_一枚阅读 260评论 0 0