RxJava3.x学习记录

RxJava3.x基本用法

1. 概述

1.1 ReactiveX与RxJava

RxJava是ReactiveX的一种Java实现,Rx是一个函数库,开发者可利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序。开发者可利用Observables表示异步数据流,用LINQ查询异步数据流,用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers

1.2 为何用RxJava

异步操作可以使用AsyncTask和Handler,但随着请求越来越多逻辑会非常复杂,但RxJava仍旧可以保持清晰的逻辑,RxJava的原理就是创建一个Observable对象来处理逻辑,然后使用各种操作符建立起来的链式操作,如同流水线一样处理数据一步步加工后发射给Subscriber处理

1.3 背压

背压是指一部场景中,被观察者发送事件的速度远快于观察者处理事件速度的情况下,一种告知上游的被观察者降低发送速度的策略

1.4 观察者和被观察者

RxJava的异步操作是通过扩展的观察者模式来实现的,RxJava中Observable代表被观察者,Observer代表观察者,RxJava3.x中有以下几个被观察者
Observable:发送0个或N个数据,不支持背压。原本是支持的,RxJava2.x后由Flowable支持,因此改成不支持背压

Flowable:发送0个或N个数据,支持背压。是RxJava2.x后的新类型

Single:只处理onSuccess和onError事件,只能发送单个数据或者发送一个错误

Completable:创建后不会发射任何数据,只处理onComplete和onError事件

Maybe:能够发射0个或1个数据,是RxJava2.x后的新类型


2. 基本实现

使用前配置gradle

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'

其中RxAndroid是RxJava在Android平台的扩展,包含了一些简化Android开发的工具,比如特殊的调度器等

RxJava的基本用法分为三个步骤,如下

2.1 创建Observer(观察者)

它决定事件触发的时候将有怎样的行为,代码如下

Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.d("MainActivity","onSubscribe");
        }

        @Override
        public void onNext(@NonNull String s) {
            Log.d("MainActivity","onNext" + s);
        }

        @Override
        public void onError(@NonNull Throwable e) {
            Log.d("MainActivity","onError");
        }

        @Override
        public void onComplete() {
            Log.d("MainActivity","onComplete");
        }
    };

各方法含义如下
onComplete:事件队列完结,RxJava不仅把每个事件单独处理,而且还会把他们看作一个队列,当不会再有新的onNext()发出时,需要触发onComplete()方法作为完成标志

onError:事件队列异常,在事件处理过程中出现异常时,onError()方法会被触发,同时队列自动终止,不允许再有事件发出

onNext:普通的事件,将要处理的事件添加到事件队列中

onSubscribe:当订阅时会被调用

2. 创建Observer(被观察者)

决定什么时候触发事件以及触发怎样的事件,RxJava使用create()方法来创建一个Observable,并为它定义事件触发规则

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
            emitter.onNext("我是谁");
            emitter.onNext("我在哪");
            emitter.onComplete();
        }
    });

通过调用subscribe()方法不断将事件添加到任务队列中,也可以用just方法实现

Observable observable = Observable.just("我是谁","我在哪");

3. Subscribe订阅

订阅只需要一行代码即可

observable.subscribe(observer);

运行代码查看log


image.png

先调用onSubscribe(),再调用两个onNext(),最后完成了调用onComplete()方法


RxJava3.x的Subject和Processor

这两个有些相似,一起说,先介绍较为复杂的Subject

1. Subject的分类

Subject既可以是一个Observer,也可以是一个Observerable,它是链接Observer和Observerable的桥梁,因此Subject可以被理解为Subject = Observer + Observable。RxJava提供了8种Subject,如下

1.1 PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者,需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里可能会有一个风险:在Subject被创建后,到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失,如果要确保来自原始Observable的所有数据都被分发,则可以当所有观察者都已经订阅时才开始发射数据,或者改用ReplaySubject

1.2 BehaviorSubject

当Observer订阅BehaviorSubject时,BehaviorSubject开始发射原始Observer最近发射的数据,如果Observer此时还没有收到任何数据,则BehaviorSubject会发射一个默认值,然后继续发射其他任何来自原始Observable的数据,如果原始的Observable因为发生错误而终止,则BehaviorSubject不会发射任何数据,但是会向Observer传递一个异常数据

1.3 ReplaySubject

不管Observer何时订阅ReplaySubject,ReplaySubject都会向Observer发射所有来自原始Observable的数据,有不同类型的ReplaySubject,它们用来限定Replay的范围,比如设定Buffer的具体大小,或者设定具体的时间范围。如果用ReplaySubject作为Observer,则注意不要在多个线程中调用onNext()、onComplete()、onError(),这可能会导致数据发送错乱,并且违反Observer规则

1.4 AsyncSubject

当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据,如果原始的Observable因为发生了错误而终止,则AsyncSubject将不会发射任何数据,但是会向Observer传递一个异常通知

1.5 UnicastSubject

只允许一个Observer进行监听,在该Obsesrver注册之前会将发射的所有事件放进一个队列中,并在Observer注册的时候将所有事件一起通知给它,如果有多个观察者订阅,那么程序会报错

1.6 CompletableSubject

只发送Observer发射完毕的数据,也就是只发送onComplete()

1.7 MaybeSubject

主要用于发送一个结果数据,一般用于验证某个结果

1.8 SingleSubject

SingleSubject和MaybeSubject的区别不大,只不过SingleSubject没有onComplete()方法和onErrorComplete()方法

2. Processor

说完Subject,来说Processor,这是RxJava2的新增功能,是一个接口,继承自Subscriber和Publisher。与Subject作用类似,只不过Processor支持背压

主要种类如下:AsnycProcessor,BehaviorProcessor,FlowableProcessor,MulticastProcessor,PublishProcessor,ReplayProcessor,UnicastProcessor,内容大部分与Subject相似,不多介绍


RxJava3.x操作符入门

RxJava操作符的类型分为创建操作符,变换操作符,过滤操作符,组合操作符,错误处理操作符,辅助操作符,条件和布尔操作符,算数和聚合操作符,而这些操作符类型下又有很多操作符,每个操作符可能还有很多变体。

1. 创建操作符

上文我们已经使用的创建操作符create()和just(),这里就不赘述了,除了它们,还有defer、range、interval、start、repeat、timer等创建操作符,下面介绍interval、range、repeat

1.1 interval

创建一个按固定时间间隔发射整数序列的Observable,相当于定时器,如下

Observable.interval(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Throwable {
                Log.d("MainActivity", "interval:" + aLong.intValue());
            }
        });

于是每隔3s就会调用accept()方法打印log,这里只截取6次


image.png

1.2 range

创建发射指定范围的整数序列的Observable,可以拿来替代for循环,发射一个范围内的有序整数序列,第一个参数是起始值,并且不小于0,第二个参数为终值,区间为左闭右开

Observable.range(0,5).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("MainActivity", "range:" + integer);
            }
        });

打印log如下


image.png

1.3 repeat

创建一个N次重复发射特定数据的Observable,如下

Observable.range(0,3).repeat(2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("MainActivity","repeat:" + integer);
            }
        });

重复打印range()里的数字2次


image.png

2. 变换操作符

变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去。变换操作符有map、flatMap、concatMap、switchMap、flatMapIterable、buffer、groupBy、cast、window、scan等,这里介绍map、flatMap、cast、concatMap、flatMapIterable、buffer和groupBy

2.1 map

通过指定一个Function对象将源Observable转换为一个新的Observable对象并发射,观察者将收到新的Observable对象并处理,假设我们要访问网络,Host地址时常是变化的,有时是测试服务器地址,有时是正式服务器地址,但是具体界面的URL地址是不变的,因此我们可以用map来进行变换字符操作,这里简单修改一下URL

final String Host = "http://**********";
        Observable.just(".cn").map(new Function<String, String>() {

            @Override
            public String apply(String s) throws Throwable {
                return Host + s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.d("ChangeActivity", "map:" + s);
            }
        });

打印log如下,加上了 “.cn”


image.png

2.2 flatMap、cast

flatMap操作符将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化地放进一个单独的Observable中。cast操作符地作用是强制将Observable发射的所有数据转换为指定类型的数据。假设我们仍旧访问网络,但是要访问同一个Host的多个界面,我们可以使用for循环在每个界面的URL前添加Host,但是RxJava提供了一个更方便的操作,如下

final String Host1 = "http://blog.****.net/";
        List<String> mlist = new ArrayList<>();
        mlist.add("itachi86");
        mlist.add("itachi87");
        mlist.add("itachi88");
        Observable.fromIterable(mlist).flatMap(new Function<String, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(String s) throws Throwable {
                return Observable.just(Host + s);
            }
        }).cast(String.class).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.d("ChangeActivity", "flatMap:" + s);
            }
        });

打印log如下所示


image.png

首先用ArrayList存储要访问的界面URL,然后通过flatMap转换成Observable。cast操作符将Observable中的数据转换为String类型

注意,flatMap的合并是允许交叉的,也就是说采用flatMap操作符时可能会交错的发送事件,最终结果的顺序可能并不是原始的Observable发送时的顺序

2.3 concatMap

concatMap操作符的功能与flatMap操作符一致,不过它解决了flatMap的交叉问题,提供了一种能把发射的值连续在一起的函数而不是合并他们,代码如下,有兴趣可以自己打一下log

final String Host2 = "http://blog.****.net/";
        Observable.fromIterable(mlist).concatMap(new Function<String, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(String s) throws Throwable {
                return Observable.just(Host + s);
            }
            //转换为String
        }).cast(String.class).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.d("ChangeActivity", "concatMap:" + s);
            }
        });

2.4 flatMapIterable

这个操作符可以将数据包装成Iterable,在Iterable中我们就可以对数据进行处理了,如下

Observable.just(1,2,3).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
            @Override
            public Iterable<Integer> apply(Integer integer) throws Throwable {
                List<Integer> mlist = new ArrayList<>();
                mlist.add(integer + 1);
                return mlist;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("ChageActivity", "flatMapIterable:" + integer);
            }
        });

将每个数都加1,打印如下


image.png

2.5 buffer

buffer操作符将源Observable变换为一个新的Observable,新的Observable每次发射一组列表值而不是一个个发射数据。与buffer操作符类似的还有window操作符,只不过window操作符发射的是Observable而不是数据列表

Observable.just(1,2,3,4,5,6).buffer(3).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Throwable {
                for(Integer i : integers){
                    Log.d("ChangeActivity", "buffer:" + i);
                }
                Log.d("MainAtivity", "----------------");
            }
        });

buffer的意思是缓存容量为3,打印如下


image.png

2.6 groupBy

groupBy用于分组元素,将源Observable转换成一个发射Observables的新的Observable(分组后的),每一个新的Observable都发射一组指定的数据,那我们先写一个Java Bean,这里为了方便不做封装

public class People {
    String name;
    String id;

    public People(String name, String id) {
        this.name = name;
        this.id = id;
    }
}

再编写groupBy

People p1 = new People("周杰伦","A");
        People p2 = new People("张震岳","SS");
        People p3 = new People("刘德华","S");
        People p4 = new People("胡一天","S");
        People p5 = new People("林志颖","A");
        People p6 = new People("鲁迅","SS");
        People p7 = new People("胡适","S");
        People p8 = new People("李大钊","A");
        Observable<GroupedObservable<String, People>> GroupedObservable =
                Observable.just(p1,p2,p3,p4,p5,p6,p7,p8).groupBy(new Function<People, String>() {
                    @Override
                    public String apply(People people) throws Throwable {
                        return people.id;
                    }
                });

        Observable.concat(GroupedObservable).subscribe(new Consumer<People>() {
            @Override
            public void accept(People people) throws Throwable {

                Log.d("ChangeActivity", "groupBy:" + people.name + "----" + people.id);
            }
        });

这里创建了8个人物,按实力进行划分,groupBy帮助我们对某一个key进行分组,相同的key的值数据放在一起,concat是组合操作符,之后会介绍,打印如下


image.png

3. 过滤操作符

过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足条件的数据,过滤操作符有filter,elementAt,distinct,skip,take,skipLast,takeLast,ignoreElements,throttleFirst,sample,debounce和throttleWithTimeOut等,下面介绍filter,elementAt,distinct,skip,take,ignoreElements,throttleFirst,throttleWithTimeOut

3.1 filter

filter操作符会对源Observable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅者,如下

Observable.just(1,2,3,4).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Throwable {
                return integer > 2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("FilterActivity","filter:" + integer);
            }
        });

打印log如下,大于2的数才会被打印出来


image.png

3.2 elementAt

elementAt操作符用来返回指定位置的数据,和它类似的还有elementAtOrDefault(int , T),elementAtOrDefault()可以允许默认值,如下

Observable.just(1,2,3,4).elementAt(2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("FilterActivity", "elementAt:" + integer);
            }
        });

打印索引为2的数,如下


image.png

3.3 distinct

distinct操作符可以用来去重,只允许还没有发射过的数据项通过,和它类似的还有distinctUntilChanged操作符,distinctUntilChanged用来去掉连续重复的数据

Observable.just(1,2,2,3,4,1).distinct().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("FilterActivity", "distinct:" + integer);
            }
        });

打印如下


image.png

3.4 skip、take

skip操作符将源Observable发射的数据过滤掉前n项,而take操作符只取前n项,另外,skiplast和takelast操作符则是从Observable发射的数据的后面进行过滤操作,首先来看skip操作符,如下

Observable.just(1,2,3,4,5,6).skip(2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("FilterActivity", "skip:" + integer);
            }
        });

打印后几项


image.png

再来看take

//take
        Observable.just(1,2,3,4,5,6).take(2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("FilterActivity", "take:" + integer);
            }
        });

取前几项,如下


image.png

3.5 ignoreElements

ignoreElements操作符忽略所有源Observable产生的结果,把Observable的onComplete()和onError()事件通知订阅者,如下

Observable.just(1,2,3,4).ignoreElements().subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onComplete() {
                Log.d("FilterActivity", "onComplete");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d("FilterActivity", "onError");
            }
        });

输出如下


image.png

3.6 throttleFirst

throttleFirst操作符会定期发射这个时间段里源Observable发射的第一个数据,throttleFirst操作符默认在computation调度器上执行,和throttleFirst操作符类似的还有sample操作符,sample操作符会定时地发射源Observable最近发射的数据,其他的都会被过滤掉,throttleFirst操作符的使用示例如下所示

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Throwable{
                for(int i = 0; i < 10; i++){
                    emitter.onNext(i);
                    try{
                        Thread.sleep(100);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
                emitter.onComplete();
            }
        }).throttleFirst(200, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("FilterActivity", "throttleFirst:" + integer);
            }
        });

每隔100ms发射一个数据,throttleFirst操作符设定的时间为200ms,因此他会发射200ms内的第一个数据,如下


image.png

3.7 throttleWithTimeOut

通过时间来限流,源Observable每次发射出来一个数据后就会进行计时,如果在设定好的时间结束前源Observable有新的数据发射出来,这个数据就会被丢弃,同时throttleWithTimeOut重新开始计时。如果每次都是在计时结束前发射数据,那么这个限流就会走向极端:只会发射最后一个数据。throttleWithTimeOut默认在computation调度器上执行。和throttleWithTimeOut操作符类似的还有deounce操作符,它不仅可以使用时间来进行过滤,还可以根据一个函数来进行限流,throttleWithTimeOut操作符的使用实例如下所示

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Throwable{
                for(int i = 0; i <10; i++){
                    emitter.onNext(i);
                    int sleep = 100;
                    if(i % 3 == 0){
                        sleep = 300;
                    }
                    try{
                        Thread.sleep(sleep);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
                emitter.onComplete();
            }
        }).throttleWithTimeout(200, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("FilterActivity", "throttleWithTimeOut:" + integer);
            }
        });

每隔100ms发射一个数据,当发射的数据是3的倍数时,下一个数据就延迟到300ms再发射,这里设定的过滤时间是200ms,也就是说发射间隔小于200ms的数据会被过滤掉,打印结果如下


image.png

4. 组合操作符

组合操作符可以同时处理多个Observable来创建我们所需要的Observable,组合操作符有merge、concat、zip、combineLatest、join和switch等,这里介绍merge、concat、zip和combineLatest

4.1 merge

merge操作符可以将多个Observable合并到一个Observable中进行发射,merge可能会让合并的Observable发射的数据交错

Observable<Integer> obs1 = Observable.just(1,2,3).subscribeOn(Schedulers.io());
        Observable<Integer> obs2 = Observable.just(4,5,6);
        Observable.merge(obs1,obs2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("CombineActivity", "merge:" + integer);
            }
        });

输出结果如下


image.png

4.2 concat

将多个Observable发射的数据进行合并发射,concat严格按照顺序发射数据,前一个Observable没发射完成是不会发射后一个Observable的数据的

Observable<Integer> obs3 = Observable.just(1,2,3).subscribeOn(Schedulers.io());
        Observable<Integer> obs4 = Observable.just(4,5,6);
        Observable.concat(obs3,obs4).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("CombineActivity", "concat:" + integer);
            }
        });

输出如下


image.png

4.3 zip

zip操作符合并两个或多个Observable发射的数据项,根据指定的函数对Observable执行变换操作,并发射一个新值

Observable<Integer> obs5 = Observable.just(1,2,3);
        Observable<String> obs6 = Observable.just("a", "b", "c");
        Observable.zip(obs5, obs6, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Throwable{
                return integer + s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.d("CombineActivity", "zip:" + s);
            }
        });

输出结果如下


image.png

4.4 combineLatest

当两个Observable中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。combineLatest操作符和zip有点类似,只不过zip操作如用于最近未打包的两个Observable,只有当原始Observable中的每一个都发射了一条数据时zip才发射数据,而combineLatest操作符用于最近发射的数据项,在原始的Observable中的任意一个发射了数据时侯继续发射一条数据

Observable<Integer> obs7 = Observable.just(1,2,3);
        Observable<String> obs8 = Observable.just("a", "b", "c");
        Observable.combineLatest(obs7, obs8, new BiFunction<Integer, String, String>() {

            @Override
            public String apply(Integer integer, String s) throws Throwable {
                return integer + s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.d("CombineActivity", "combineLatest:" + s);
            }
        });

如果其中一个Observable还有数据没有发射,那么combineLatest操作符会将两个Observable最新发射的数据组合在一起,如上。第一个Observable最新的数据是3,然后第二个Observable的数据依次在变,之后把第一个和第二个Observable数据组合在一起,打印如下


image.png

5. 辅助操作符

辅助操作符可以帮助我们更方便的处理Observable,辅助操作符包括delay、do、subscribeOn、observeOn、timeout、meterialize、dematerialize、timeInterval、timestamp和to等,下面介绍delay、do、subscribeOn、observeOn和timeout

delay

delay操作符让原始Observabl在发射每项数据之前都暂停一段指定的时间段

Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Throwable {
                Long currentTime = System.currentTimeMillis() / 1000;
                emitter.onNext(currentTime);
            }
        }).delay(2, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Throwable {
                Log.d("HelpActivity", "delay:" + (System.currentTimeMillis() / 1000 - aLong));
            }
        });

输出结果如下


image.png

5.1 do

do系列操作符就是为原始的Observable的生命周期事件注册一个回调,当Observable的某事件发生时就会调用这些回调,RxJava中有很多do系列操作符,如下

  1. doOnEach:为Observable注册这样一个回调,Observable每发射一项数据就会调用一个回调函数,包括onNext、onError和onComplete
  2. doOnNext:只有执行onNext的时候会被调用
  3. doOnSubscribe:当观察者订阅Observable时就会被调用
  4. doOnError:当Observable异常终止调用onError时会被调用
  5. doOnTerminate:当Observable终止(无论正常终止或异常终止)之前会被调用
  6. finallyDo:当Observable终止(无论正常终止还是异常终止)之后会被调用

这里拿doOnNext来举例,如下所示

Observable.just(1,2).doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("HelpActivity","call:" + integer);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d("HelpActivity", "onNext:" + integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d("HelpActivity", "Error:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d("HelpActivity","onComplete");
            }
        });

输出如下


image.png

5.2 subscribeOn、observeOn

subscribeOn操作符用于指定Observable自身在哪个线程上运行,如果Observable需要执行耗时操作,则一般可以让其在新开的一个子线程上运行。observeOn用来指定Observe所运行的线程,也就是发射出的数据在那个线程上使用。一般情况下会指定其在主线程中运行,这样就可以修改UI,具体如下

Observable<Integer> obs = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                Log.d("HelpActivity", "Observable:" + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onComplete();
            }
        });

obs.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("HelpActivity", "Observer:" + Thread.currentThread().getName());
            }
        });

subscribeOn(Schedulers.newThread())表示Observable运行在新开的线程,observeOn(AndroidSchedulers.mainThread())表示Observable运行在主线程,其中,AndroidSchedulers是RxAndroid库提供的Scheduler

输出结果如下


image.png

5.3 timeout

如果原始的Observable过了指定的一段时长还没有发射任何数据,则timeout操作符会以一个onError通知来终止这个Observable,或者继续执行一个备用的Observable。timeout有很多变体,这里介绍其中的一种:timeout(long,TimeUnit,Observable)。它在超时时会切换到使用一个你指定的备用Observable,而不是发送错误通知。它默认在computation调度器上执行,具体如下

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                for(int i = 0; i < 4; i++){
                    try{
                        Thread.sleep(i * 100);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        }).timeout(200, TimeUnit.MILLISECONDS, Observable.just(10,11));
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("HelpActivity", "timeout:" + integer);
            }
        });
image.png

6. 错误处理操作符

RxJava在错误出现时就会调用观察者的onError()方法将错误分发出去,由观察者自己处理错误,但是如果每个观察者都处理一遍错误的话,工作量就会很大。这可以使用错误处理操作符,错误处理操作符由catch和retry

6.1 catch

catch操作符拦截原始Observable的onError通知,将它替换为其他数据项或数据序列,让产生的Observable能够正常终止或者根本不终止

RxJava将catch实现为3个不同的操作符

  1. onErrorReturn:返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会发射一个特殊的项并调用观察者的onComplete()方法
  2. onErrorResumeNext:返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError()调用,不会将错误传递给观察者,作为替代,它会发射备用的Observable
  3. onExceptionResumeNext:和onErrorResumeNext类似,onExceptionResumeNext()方法返回一个镜像原有Observable行为的新Observable,不同的是,如果onError()收到的Throwable不是一个Exception,就将错误传递给观察者的onError()方法,而不会使用备用的Observable

下面举例onErrorReturn操作符

Observable.create(new ObservableOnSubscribe<Integer>() {
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception{
                for(int i = 0; i < 5; i++){
                    if(i == 2){
                        emitter.onError(new Throwable("Throwable"));
                    }
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        }).onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Exception {
                Log.e("MistakeActivity", "在onErrorReturn处理了: " + throwable.toString());
                return 6;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.i("MistakeActivity", "onNext:" + integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d("MistakeActivity", "onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i("MistakeActivity", "onComplete");
            }
        });

输出结果如下


image.png

6.2 retry

retry操作符不会将原始的Observable的onError()通知传递给观察者,观察者会订阅这个Observable,再给这个Observable一次机会来无错误地完成它的数据序列。retry总是传递onNext通知给观察者,由于重新订阅,因此可能会造成数据项重复。RxJava中的实现为retry和retryWhen。这里拿retry(long)举例,retry(long)指定了最多重新订阅的次数。如果次数超了,它不会尝试再次订阅。retry(long)会把最新的一个onError通知传递给自己的观察者,如下

Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                for(int i = 0; i < 5; i++){
                    if(i == 1){
                        emitter.onError(new Throwable("Throwable"));
                    }else{
                        emitter.onNext(i);
                    }
                }
                emitter.onComplete();
            }
        }).retry(3).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d("MistakeActivity","onNext:" + integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d("MistakeActivity", "onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d("MistakeActivity", "onComplete");
            }
        });

上面重新订阅次数为3,在i = 0时调用onNext()方法,此外重试的3次也会调用onNext()方法,这样一共调用4次onNext(),最后调用onError()方法,输出如下


image.png

7. 条件操作符和布尔操作符

条件操作符和布尔操作符可用于根据条件发射或变换Observable,或者对他们做布尔运算。先来了解一下布尔操作符

布尔操作符

布尔操作符有all、contains、isEmpty、exists和sequenceEqual。下面介绍前3个操作符

7.1 all

all操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断的结果。这个函数使用发射的数据作为参考,内部判断所有的数据是否满足我们定义好的判断条件。满足返回true,否则返回false

Observable.just(1,2,3).all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Throwable {
                Log.d("BoolActivity", "call:" + integer);
                return integer < 2;
            }
        }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Throwable {
                Log.d("BoolActivity", "accept--" + aBoolean);
            }
        });

输出结果如下


image.png

7.2 contains

contains操作符用来判断源Observable所发射的数据是否包含某一个数据。如果包含返回true。如果源Observable已经结束了却还没有发射这个数据,则返回false

Observable.just(1,2,3).contains(1).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Throwable {
                Log.d("BoolActivity", "contains:" + aBoolean);
            }
        });

输出如下


image.png

7.3 isEmpty

isEmpty操作符用来判断源Observable是否发射过数据,如果发射过数据就返回false,如果源Observable已经结束了却还没有发射这个数据,则返回true

Observable.just(1,2,3).isEmpty().subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Throwable {
                Log.d("BoolActivity", "isEmpty:" + aBoolean);
            }
        });

输出如下


image.png

条件操作符

条件操作符有amb、defaultIfEmpty、skipUntil、skipWhile、takeUntil、takeWhile等。这里介绍前两个操作符

7.4 amb

amb操作符对于给定的两个或多个Observable,它只发射首先发射数据或通知的那个Observable的所有数据

Observable.ambArray(Observable.just(1,2,3).delay(2, TimeUnit.SECONDS),Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        Log.d("ConditionActivity", "amb:"+ integer);
                    }
                });

第一个Observable延迟2s发射,所以很显然最终只会发射第二个Observable,输出结果如下


image.png

7.5 defaultIfEmpty

发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据,如下

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onComplete();
            }
        }).defaultIfEmpty(3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d("ConditionActivity","defaultIfEmpty:" + integer);
            }
        });

这里没有create任何数据,所以发射default的数据3


image.png

8. 转换操作符

转换操作符用来将Observable转换为另一个对象或数据结构,转换操作符有toList、toSortedList、toMap、toMultiMap、getIterator和nest等,这里介绍前3种操作符

8.1 toList

将发射多项数据的Observable会为每一项数据调用onNext()方法,toList操作可将该Observable发射的多项数据组合成一个List

Observable.just(1,2,3).toList().subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Throwable {
                for(int i : integers){
                    Log.i("ConvertActivity", "toList:" + i);
                }
            }
        });

输出结果如下


image.png

8.2 toSortedList

类似于toList操作符,不同的是,toSortedList操作符会将对产生的列表排序,默认自然升序。如果发射的数据项没有实现Comparable接口,则会抛出一个异常。当然,若发射的数据项没有实现Comparable接口,则可以使用toSortedList(Func2)的变体,其传递的函数参数可用于比较两个数据项

Observable.just(3,1,2).toSortedList().subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Throwable {
                for(int i : integers){
                    Log.i("ConvertActivity", "toSortedList:" + i);
                }
            }
        });

输出结果如下


image.png

8.3 toMap

操作符将原始的Observable发射的所有数据项收集到一个Map(默认是HashMap)中,然后发射这个Map。你可以提供一个用于生成Map的key的函数,也可以将一个函数转换后的数据项作为Map存储的值(默认情况下数据项本身就是值)

People p1 = new People("小A","A");
        People p2 = new People("小B","B");
        People p3 = new People("小C","C");
        Observable.just(p1,p2,p3).toMap(new Function<People, String>() {

            @Override
            public String apply(People people) throws Throwable {
                return people.id;
            }
        }).subscribe(new Consumer<Map<String, People>>() {
            @Override
            public void accept(Map<String, People> stringPeopleMap) throws Throwable {
                Log.i("ConvertActivity", "toMap:" + stringPeopleMap.get("B").name);
            }
        });

打印key值为B的名字


image.png

本文摘抄自《进阶之光——刘望舒》,是个人学习路线上的总结,不以盈利为目的。


欢迎指正。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容