Rxjava系列(六) RxJava2.0操作符详解

Rxjava2.0概述

通过前面的文章介绍,读者对RxJava2.0应该有了初步的认识。RxJava2.0相对1.0没有增加新的功能,最大的变化是把原来与背压有关的操作符抽取出来放在Flowable中处理;另外2.0的接口定义更符合ReactiveStream的规范。操作符的功能和1.0相比没有太大的变化,不过Flowable是背压相关的功能抽离出来的,本篇我们来详细分析下各种操作符的作用和用法。每种类型的操作符会选择几个重点介绍,提醒读者在阅读的过程中注意操作符背压的处理。另外操作符都是支持泛型的,对泛型不了解的读者,需要先熟悉一下泛型相关的知识。

创建操作符

create操作符

这里首先介绍create操作符,它是使用最广泛的创建操作符。

create操作符的使用场景

create是最基本的操作符,用来创建一个Flowable,事件流的生产和下发由用户自己定义,是一个完全可定制的Flowable。

create操作符的基本使用方法

  Flowable.create(new FlowableOnSubscribe<Object>() {
             @Override
             public void subscribe(@NonNull FlowableEmitter<Object> e) throws Exception {
                 e.onNext("1");
                 e.onNext("2")
            }
            }, BackpressureStrategy.BUFFER)
        .subscribe(new FlowableSubscriber<Object>() {
            Subscription  st=null;
            @Override
            public void onSubscribe(@NonNull Subscription s) {
                s.request(1);
                st =s;
            }

            @Override
            public void onNext(Object o) {
                System.out.print(s);
                st.request(1);
                
            }

            @Override
            public void onComplete() {

            }

            @Override
            public void onError(Throwable throwable) {

            }
        });

以上输出的结果为:1 2
create方法的

  • 第一个参数是FlowableOnSubscribe,它只有一个方法subscribe,subscribe是事件生产的地方,subscribe的参数FlowableEmitter是内部类,其中一个属性是FlowableSubscriber,也就是我们定义的观察者。通过调用 e.onNext("1"),FlowableEmitter会调用观察者onNext的方法即是FlowableSubscriber的onNext方法,从而完成事件下发给观察者。
  • 第二个参数是背压策略,RxJava2.0定义了五种背压策略,后续文章会重点讲述背压策略的详细区别。背压策略会影响观察者能否正确接收到事件。

create操作符事件的处理流程

以上面的代码为例,当subscribe(new FlowableSubscriber<Object>()执行时,会首先执行onSubscribe(@NonNull Subscription s)方法,它执行在调用者的线程,对于背压来说,这里需要告知生产者事件的下流观察者的事件处理能力;然后是subscribe(@NonNull FlowableEmitter<Object> e)执行事件的的生产逻辑,这里会判断观察者的处理数据的能力决定是否执行观察者的onNext方法。这里判断的依据是s.request()的参数是否大于零。

create操作符需要注意的地方

前面也提到过,观察者能否接收到事件取决于s.request()的参数是否大于零或者s.request()是否被调用。如果FlowableSubscriber的方法onSubscribe没有调用过s.request(n)或者n<=0 ,FlowableSubscriber不会接收到事件。上面的例子中观察者只能就收到事件"1", 而不能接收"2",因为只调用对s.request(1),如果希望接收到两次事件可以在onSubscribe调用s.request(2),或者是onSubscribe调用s.request(1)并且onNext中调用s.request(1)。

fromArray操作符

fromArray操作符的使用场景

fromArray用于快速创建一个Flowable,直接发送数组的数据。

fromArray操作符的基本使用方法

  Flowable.fromArray(new String[]{"1","2","3"})
          .subscribe(new FlowableSubscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.print(s);
                    }
                    @Override
                    public void onError(Throwable throwable) {

                    }
                    @Override
                    public void onComplete() {
                    }
                });

以上输出的结果为:1 2 3
fromArray的参数是一个数组,数组的元素会被依次发送出去。

fromArray操作符的事件 执行流程

1 从subscribe(new FlowableSubscriber<String>())方法开始执行;
2 通过内部处理onSubscribe方法会被执行,这里需要背压处理,如果没有调用s.request()或者s.request(n)的参数小于等于0,流程结束;
3 如果s.request(n)的参数大于零,会执行onNext;
4 重复步骤2,3,每执行一次onNext,参数n就会减1。

fromArray操作符需要注意的地方

1 数组不能是list类型,list会被当做一个事件发送出去。
2 背压处理类似create操作符,具体参考上文。
3 fromArray,FlowableSubscriber支持泛型,但是数组的元素类型和观察者的类型要一致。

转换操作符

该类型的操作符都作用于一个可观测序列,然后通过function函数把它变换其他的值,最后用一种新的形式返回它们。

map操作符

map操作符的基本用法

Flowable.fromArray(new String[]{"1","2","3"})
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s+ "map";
                    }
                })
                .subscribe(new FlowableSubscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.print(s);
                    }
                    @Override
                    public void onError(Throwable throwable) {

                    }
                    @Override
                    public void onComplete() {
                    }
                });  

以上输出的结果为:1map 2map 3map

这里map操作符的参数Function有两个泛型,第一个泛型是接收到的事件的类型,第二个泛型是发送事件的类型,因此,第一个泛型需要和fromArray的泛型是同一个类型,而观察者的泛型和第二个泛型是同一个类型。

map操作符的执行流程

1 fromArray操作符首先创建一个Flowable1
2 Flowable1调用操作符map,产生一个新的Flowable2;
3 Flowable2 订阅观察者
4 Flowable1 调用观察者的onSubscribe()方法,需要处理背压;
5 观察者在onSubscribe()方法中通知上游观察者的能力是3;
6 Flowable1 开始发射事件"1";
7 Flowable2 的Function对事件“1”执行apply()变换,转化成"1map";
8 观察者接收到事件"1map";
9 重复步骤7,8,直到事件发送完毕或者被取消或者背压为0。

flatMap操作符

flatMap操作符的功能

flatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。

flatMap操作符的基本使用

Student student1 = new Student();
        Student student2 = new Student();
        Student student3 = new Student();
        Student[] array={student1,student2,student3};
        for(int i=0;i<2;i++){
            array[i].cursors = new Cursor[2];
            Cursor cursor1 = new Cursor();
            cursor1.name="n1";
            cursor1.score=10*i;
            Cursor cursor2 = new Cursor();
            cursor2.name="n2";
            cursor2.score=10*i+1;
            array[i].cursors[0]=cursor1;
            array[i].cursors[1] = cursor2;
        }
        Flowable.fromArray(array)
                .flatMap(new Function<Student, Flowable<Cursor>>() {
                    @Override
                    public Flowable<Cursor> apply(Student student) throws Exception {
                        return  Flowable.fromArray(student.cursors);
                    }
                }).subscribe(new FlowableSubscriber<Cursor>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }
            @Override
            public void onNext(Cursor cursor) {
                System.out.print(cursor.name+","+cursor.score);
            }
            @Override
            public void onError(Throwable throwable) {
            }
            @Override
            public void onComplete() {
            }
        });

以上面的例子为例来理解flatMap操作符:需求是打印所有学生的各科成绩。
所有的学生为一个数组,通过fromArray操作符依次发送事件,每个学生作为一个事件发送出去,flatMap操作符把每个学生变换为对应的Flowable,这个Flowable会把学生的成绩依次发送出去,最后所有的成绩汇总,下发给观察者。
可以看出,不需要循环操作,通过flatMap操作符就完成了所有学生的各科成绩的打印。

flatMap操作符需要注意的地方

经过flatMap操作变换后,最后输出的序列有可能是交错的,因为flatMap最后合并结果采用的是merge操作符,后面会详细介绍merge操作符。如果要想经过变换后,最终输出的序列和原序列一致,那就会用到另外一个操作符,concatMap。

过滤操作符

顾名思义,这类操作符按照过滤条件对可观测的事件序列筛选,返回满足我们条件的事件。过滤类操作符主要包含: Filter, Take, TakeLast, TakeUntilSkip, SkipLast, ElementAt, Debounce, Distinct, DistinctUntilChanged, First, Last等等。

fliter操作符

fliter操作符的基本用法

Flowable.fromArray(array)
                .filter(new Predicate<Student>() {
                    @Override
                    public boolean test(Student student) throws Exception {
                        return student.cursors[1].score<20;
                    }
                })
                .subscribe(new FlowableSubscriber<Student>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);
                    }
                    @Override
                    public void onNext(Student student) {
                        System.out.print(student.name);
                    }
                    @Override
                    public void onError(Throwable throwable) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });

以上面flatMap的场景,如果需求变为:要求打印第二门课程的成绩<20的学生名单。同样的fromArray会依次下发所有的学生,fliter操作符判断成绩是否小于20,将小于20的学生下发给观察者。

filter操作符的流程

1 fromArray创建Flowable1;
2 filter创建操作符Flowable2;
3 Flowable2 订阅观察者;
4 执行观察者的onSubscribe()方法,需要处理背压;
5 Flowable1 依次发送student事件;
6 Flowable2调用test()判断student事件是否符合筛选条件,符合返回true,否则返回false;
7 Flowable2 根据test()的结果下发事件,返回值=true 事件下发给观察者,返回值=false事件丢弃;
8 重复步骤5,6,7 直到事件发送完毕或者被取消或者背压为0。

distinct操作符

distinct操作符过滤规则是只允许还没有发射过的数据通过,所有重复的数据项都只会发射一次。在数据去重场景中非常有用。

distinct操作符的基本用法

Flowable.fromArray(array)
                .distinct()
                .subscribe(new FlowableSubscriber<Student>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);
                    }
                    @Override
                    public void onNext(Student student) {
                        System.out.print(student.name);
                    }
                    @Override
                    public void onError(Throwable throwable) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });

distinct操作符的执行流程

1 fromArray创建Flowable1;
2 distinct创建操作符Flowable2;
3 Flowable2 订阅观察者;
4 执行观察者的onSubscribe()方法,此处应该调用request()方法,否则后面的流程中事件无法下发给观察者
5 Flowable1 依次发送student事件给Flowable2;
6 Flowable2判断事件是否已经下发过,对事件去重;
7 如果已经下发过该事件则调用request(1),纠正背压;若果该事件没有下发过且被压不为零则下发给观察者;
8 重复步骤5,6,7 直到事件结束或者取消或者背压为零。

组合操作符

merge操作符

Merge其实只是将多个Flowable的输出序列变为一个,方便订阅者统一处理,对于订阅者来说就仿佛只订阅了一个观察者一样

merge操作符的基本使用方法

Flowable f1 = Flowable.just(1, 2, 3);
Flowable f2 = Flowable.just(7, 8, 9, 10);
Flowable.merge(f1,f2)
                .subscribe(new FlowableSubscriber() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(7);
                    }
                    @Override
                    public void onNext(Object o) {
                        System.out.print("merge:"+o.toString());
                    }
                    @Override
                    public void onError(Throwable throwable) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });

merge操作符的执行流程

1 创建Flowable1;
2 创建Flowable2
3 merge创建操作符Flowable3;
4 Flowable3 订阅观察者;
5 Flowable3 内部创建1个队列,用来Flowable1和Flowable2的事件
6 执行观察者的onSubscribe()方法,此处应该调用request()方法,否则后面的流程中事件无法下发给观察者
7 Flowable1 发送事件,Flowable3将其放进队列中;
8 Flowable3 判断队列是否有事件,背压是否不为零
9 如果8的条件都满足,将队列下发给观察者;
10 重复步骤7,8,9 直到事件结束或者取消或者背压为零。

zip操作符

zip通过一个函数将多个Flowable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Flowable一样多的数据

zip操作符的基本使用方法

 Flowable f1 = Flowable.just(1, 2, 3);
 Flowable f2 = Flowable.just(7, 8, 9, 10);
Flowable.zip(f1, f2, new BiFunction() {
                             @Override
                            public Object apply(Object o, Object o2) throws Exception {
                            return "f1: "+o.toString()+" and f2: "+o2.toString();
                     }})
                .subscribe(new FlowableSubscriber() {
                    @Override
                    public void onSubscribe(Subscription s) {
                      s.request(4);  
                    }
                    @Override
                    public void onNext(Object o) {
                        System.out.print("zip:"+o.toString());
                    }
                    @Override
                    public void onError(Throwable throwable) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });

zip操作符的执行流程

1 创建Flowable1;
2 创建Flowable2
3 zip创建操作符Flowable3;
4 Flowable3 订阅观察者;
5 Flowable3 内部创建两个队列,分别用来暂存Flowable1和Flowable2的事件
6 执行观察者的onSubscribe()方法,此处应该调用request()方法,否则后面的流程中事件无法下发给观察者
7 Flowable1 发送事件,Flowable3将其放进队列1中;
8 Flowable3 判断队列1和队列2是否有事件,背压是否不为零
9 如果8的条件都满足,将队列1和队列2的事件执行BiFunction的apply()方法;
10 如果8的条件不满足 Flowable2 发送事件,Flowa3将其放进队列2中;
11 Flowable3 判断队列1和队列2是否有事件,背压是否不为零;
12 如果11的条件都满足,将队列1和队列2的事件执行BiFunction的apply()方法;
13 重复步骤7,8,9,10,11,12 直到事件结束或者取消或者背压为零。

zip操作符需要注意的地方

1 如果多个Flowable的事件长度不一样,观察者接收到的是事件长度和最短的Flowable事件长度一样。
2 如果多个Flowable的事件长度不一样,最短的是d,每个Flowable只取前面d个事件。

切换线程操作符

subscribeOn

subscribeOn操作符的基本用法

 Flowable.fromArray(array)
                .subscribeOn(Schedulers.newThread())
                .subscribe(new FlowableSubscriber<Student>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);
                        System.out.print(Thread.currentThread().getName());
                    }
                    @Override
                    public void onNext(Student student) {
                        System.out.print(student.name);
                        System.out.print(Thread.currentThread().getName());
                    }
                    @Override
                    public void onError(Throwable t) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });

subscribeOn操作符的流程

1 fromArray操作符创建Flowable1;
2 subscribeOn操作符创建Flowable2;
3 Flowable2订阅观察者;
4 Flowable2 内部调用观察者的onSubscribe()方法;
5 Flowable2 切换到线程Thread2;
6 Flowable1 在线程Thread2中生产并下发事件;
7 观察者在线程Thread2中接收事件;
8 重复步骤6,7,直到事件下发完毕或者被取消。

subscribeOn操作符需要注意的地方

1 在一个场景中可以多次使用subscribeOn操作符;
2 多次使用时,Flowable的事件都是在第一个subscribeOn操作符的线程种执行;
3 后面的 subscribeOn 只会改变前面的 subscribeOn 调度操作所在的线程,并不能改变最终被调度的事件执行的线程,但对于中途的代码执行的线程,还是会影响到的。

observerOn

observerOn操作符的基本用法

Flowable.fromArray(array)
                .observeOn(Schedulers.newThread())
                .subscribe(new FlowableSubscriber<Student>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);
                        System.out.print(Thread.currentThread().getName());
                    }
                    @Override
                    public void onNext(Student student) {
                        System.out.print(student.name);
                        System.out.print(Thread.currentThread().getName());
                    }
                    @Override
                    public void onError(Throwable t) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });

observerOn操作符的流程

1 fromArray操作符创建Flowable1;
2 observerOn操作符创建Flowable2;
3 Flowable2订阅观察者;
4 Flowable2 内部调用观察者的onSubscribe()方法;
5 Flowable1 生产并下发事件;
6 Flowable2 接收Flowable1的事件并切换到线程Thread2;
7 观察者在线程Thread2中接收事件;
8 重复步骤5,6,7直到事件下发完毕或者被取消。

observerOn操作符需要注意的地方

1 在一个场景中可以多次使用observerOn操作符;
2 每级的Observer的onXXX方法都在不同的线程中被调用。所以observeOn的调用会多次生效;
3 observeOn 影响它下面的调用执行时所在的线程,每次调用都会改变数据向下传递时所在的线程。

参考

给初学者的RxJava2.0教程
探索专为 Android 而设计的 RxJava 2
RxJava系列6(从微观角度解读RxJava源码)
浅谈 RxJava 与 2.0 的新特性
RxJava 2.x 使用详解

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容