RxJava3.x基本用法
1. 概述
1.1 ReactiveX与RxJava
RxJava是ReactiveX的一种Java实现,Rx是一个函数库,开发者可利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序。开发者可利用Observables表示异步数据流,用LINQ查询异步数据流,用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers
1.2 为何用RxJava
异步操作可以使用AsyncTask和Handler,但随着请求越来越多逻辑会非常复杂,但RxJava仍旧可以保持清晰的逻辑,RxJava的原理就是创建一个Observable对象来处理逻辑,然后使用各种操作符建立起来的链式操作,如同流水线一样处理数据一步步加工后发射给Subscriber处理
1.3 背压
背压是指一部场景中,被观察者发送事件的速度远快于观察者处理事件速度的情况下,一种告知上游的被观察者降低发送速度的策略
1.4 观察者和被观察者
RxJava的异步操作是通过扩展的观察者模式来实现的,RxJava中Observable代表被观察者,Observer代表观察者,RxJava3.x中有以下几个被观察者
Observable:发送0个或N个数据,不支持背压。原本是支持的,RxJava2.x后由Flowable支持,因此改成不支持背压
Flowable:发送0个或N个数据,支持背压。是RxJava2.x后的新类型
Single:只处理onSuccess和onError事件,只能发送单个数据或者发送一个错误
Completable:创建后不会发射任何数据,只处理onComplete和onError事件
Maybe:能够发射0个或1个数据,是RxJava2.x后的新类型
2. 基本实现
使用前配置gradle
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
其中RxAndroid是RxJava在Android平台的扩展,包含了一些简化Android开发的工具,比如特殊的调度器等
RxJava的基本用法分为三个步骤,如下
2.1 创建Observer(观察者)
它决定事件触发的时候将有怎样的行为,代码如下
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d("MainActivity","onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.d("MainActivity","onNext" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("MainActivity","onError");
}
@Override
public void onComplete() {
Log.d("MainActivity","onComplete");
}
};
各方法含义如下
onComplete:事件队列完结,RxJava不仅把每个事件单独处理,而且还会把他们看作一个队列,当不会再有新的onNext()发出时,需要触发onComplete()方法作为完成标志
onError:事件队列异常,在事件处理过程中出现异常时,onError()方法会被触发,同时队列自动终止,不允许再有事件发出
onNext:普通的事件,将要处理的事件添加到事件队列中
onSubscribe:当订阅时会被调用
2. 创建Observer(被观察者)
决定什么时候触发事件以及触发怎样的事件,RxJava使用create()方法来创建一个Observable,并为它定义事件触发规则
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("我是谁");
emitter.onNext("我在哪");
emitter.onComplete();
}
});
通过调用subscribe()方法不断将事件添加到任务队列中,也可以用just方法实现
Observable observable = Observable.just("我是谁","我在哪");
3. Subscribe订阅
订阅只需要一行代码即可
observable.subscribe(observer);
运行代码查看log
先调用onSubscribe(),再调用两个onNext(),最后完成了调用onComplete()方法
RxJava3.x的Subject和Processor
这两个有些相似,一起说,先介绍较为复杂的Subject
1. Subject的分类
Subject既可以是一个Observer,也可以是一个Observerable,它是链接Observer和Observerable的桥梁,因此Subject可以被理解为Subject = Observer + Observable。RxJava提供了8种Subject,如下
1.1 PublishSubject
PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者,需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里可能会有一个风险:在Subject被创建后,到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失,如果要确保来自原始Observable的所有数据都被分发,则可以当所有观察者都已经订阅时才开始发射数据,或者改用ReplaySubject
1.2 BehaviorSubject
当Observer订阅BehaviorSubject时,BehaviorSubject开始发射原始Observer最近发射的数据,如果Observer此时还没有收到任何数据,则BehaviorSubject会发射一个默认值,然后继续发射其他任何来自原始Observable的数据,如果原始的Observable因为发生错误而终止,则BehaviorSubject不会发射任何数据,但是会向Observer传递一个异常数据
1.3 ReplaySubject
不管Observer何时订阅ReplaySubject,ReplaySubject都会向Observer发射所有来自原始Observable的数据,有不同类型的ReplaySubject,它们用来限定Replay的范围,比如设定Buffer的具体大小,或者设定具体的时间范围。如果用ReplaySubject作为Observer,则注意不要在多个线程中调用onNext()、onComplete()、onError(),这可能会导致数据发送错乱,并且违反Observer规则
1.4 AsyncSubject
当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据,如果原始的Observable因为发生了错误而终止,则AsyncSubject将不会发射任何数据,但是会向Observer传递一个异常通知
1.5 UnicastSubject
只允许一个Observer进行监听,在该Obsesrver注册之前会将发射的所有事件放进一个队列中,并在Observer注册的时候将所有事件一起通知给它,如果有多个观察者订阅,那么程序会报错
1.6 CompletableSubject
只发送Observer发射完毕的数据,也就是只发送onComplete()
1.7 MaybeSubject
主要用于发送一个结果数据,一般用于验证某个结果
1.8 SingleSubject
SingleSubject和MaybeSubject的区别不大,只不过SingleSubject没有onComplete()方法和onErrorComplete()方法
2. Processor
说完Subject,来说Processor,这是RxJava2的新增功能,是一个接口,继承自Subscriber和Publisher。与Subject作用类似,只不过Processor支持背压
主要种类如下:AsnycProcessor,BehaviorProcessor,FlowableProcessor,MulticastProcessor,PublishProcessor,ReplayProcessor,UnicastProcessor,内容大部分与Subject相似,不多介绍
RxJava3.x操作符入门
RxJava操作符的类型分为创建操作符,变换操作符,过滤操作符,组合操作符,错误处理操作符,辅助操作符,条件和布尔操作符,算数和聚合操作符,而这些操作符类型下又有很多操作符,每个操作符可能还有很多变体。
1. 创建操作符
上文我们已经使用的创建操作符create()和just(),这里就不赘述了,除了它们,还有defer、range、interval、start、repeat、timer等创建操作符,下面介绍interval、range、repeat
1.1 interval
创建一个按固定时间间隔发射整数序列的Observable,相当于定时器,如下
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Throwable {
Log.d("MainActivity", "interval:" + aLong.intValue());
}
});
于是每隔3s就会调用accept()方法打印log,这里只截取6次
1.2 range
创建发射指定范围的整数序列的Observable,可以拿来替代for循环,发射一个范围内的有序整数序列,第一个参数是起始值,并且不小于0,第二个参数为终值,区间为左闭右开
Observable.range(0,5).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("MainActivity", "range:" + integer);
}
});
打印log如下
1.3 repeat
创建一个N次重复发射特定数据的Observable,如下
Observable.range(0,3).repeat(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("MainActivity","repeat:" + integer);
}
});
重复打印range()里的数字2次
2. 变换操作符
变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去。变换操作符有map、flatMap、concatMap、switchMap、flatMapIterable、buffer、groupBy、cast、window、scan等,这里介绍map、flatMap、cast、concatMap、flatMapIterable、buffer和groupBy
2.1 map
通过指定一个Function对象将源Observable转换为一个新的Observable对象并发射,观察者将收到新的Observable对象并处理,假设我们要访问网络,Host地址时常是变化的,有时是测试服务器地址,有时是正式服务器地址,但是具体界面的URL地址是不变的,因此我们可以用map来进行变换字符操作,这里简单修改一下URL
final String Host = "http://**********";
Observable.just(".cn").map(new Function<String, String>() {
@Override
public String apply(String s) throws Throwable {
return Host + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
Log.d("ChangeActivity", "map:" + s);
}
});
打印log如下,加上了 “.cn”
2.2 flatMap、cast
flatMap操作符将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化地放进一个单独的Observable中。cast操作符地作用是强制将Observable发射的所有数据转换为指定类型的数据。假设我们仍旧访问网络,但是要访问同一个Host的多个界面,我们可以使用for循环在每个界面的URL前添加Host,但是RxJava提供了一个更方便的操作,如下
final String Host1 = "http://blog.****.net/";
List<String> mlist = new ArrayList<>();
mlist.add("itachi86");
mlist.add("itachi87");
mlist.add("itachi88");
Observable.fromIterable(mlist).flatMap(new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(String s) throws Throwable {
return Observable.just(Host + s);
}
}).cast(String.class).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
Log.d("ChangeActivity", "flatMap:" + s);
}
});
打印log如下所示
首先用ArrayList存储要访问的界面URL,然后通过flatMap转换成Observable。cast操作符将Observable中的数据转换为String类型
注意,flatMap的合并是允许交叉的,也就是说采用flatMap操作符时可能会交错的发送事件,最终结果的顺序可能并不是原始的Observable发送时的顺序
2.3 concatMap
concatMap操作符的功能与flatMap操作符一致,不过它解决了flatMap的交叉问题,提供了一种能把发射的值连续在一起的函数而不是合并他们,代码如下,有兴趣可以自己打一下log
final String Host2 = "http://blog.****.net/";
Observable.fromIterable(mlist).concatMap(new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(String s) throws Throwable {
return Observable.just(Host + s);
}
//转换为String
}).cast(String.class).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
Log.d("ChangeActivity", "concatMap:" + s);
}
});
2.4 flatMapIterable
这个操作符可以将数据包装成Iterable,在Iterable中我们就可以对数据进行处理了,如下
Observable.just(1,2,3).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer integer) throws Throwable {
List<Integer> mlist = new ArrayList<>();
mlist.add(integer + 1);
return mlist;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("ChageActivity", "flatMapIterable:" + integer);
}
});
将每个数都加1,打印如下
2.5 buffer
buffer操作符将源Observable变换为一个新的Observable,新的Observable每次发射一组列表值而不是一个个发射数据。与buffer操作符类似的还有window操作符,只不过window操作符发射的是Observable而不是数据列表
Observable.just(1,2,3,4,5,6).buffer(3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Throwable {
for(Integer i : integers){
Log.d("ChangeActivity", "buffer:" + i);
}
Log.d("MainAtivity", "----------------");
}
});
buffer的意思是缓存容量为3,打印如下
2.6 groupBy
groupBy用于分组元素,将源Observable转换成一个发射Observables的新的Observable(分组后的),每一个新的Observable都发射一组指定的数据,那我们先写一个Java Bean,这里为了方便不做封装
public class People {
String name;
String id;
public People(String name, String id) {
this.name = name;
this.id = id;
}
}
再编写groupBy
People p1 = new People("周杰伦","A");
People p2 = new People("张震岳","SS");
People p3 = new People("刘德华","S");
People p4 = new People("胡一天","S");
People p5 = new People("林志颖","A");
People p6 = new People("鲁迅","SS");
People p7 = new People("胡适","S");
People p8 = new People("李大钊","A");
Observable<GroupedObservable<String, People>> GroupedObservable =
Observable.just(p1,p2,p3,p4,p5,p6,p7,p8).groupBy(new Function<People, String>() {
@Override
public String apply(People people) throws Throwable {
return people.id;
}
});
Observable.concat(GroupedObservable).subscribe(new Consumer<People>() {
@Override
public void accept(People people) throws Throwable {
Log.d("ChangeActivity", "groupBy:" + people.name + "----" + people.id);
}
});
这里创建了8个人物,按实力进行划分,groupBy帮助我们对某一个key进行分组,相同的key的值数据放在一起,concat是组合操作符,之后会介绍,打印如下
3. 过滤操作符
过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足条件的数据,过滤操作符有filter,elementAt,distinct,skip,take,skipLast,takeLast,ignoreElements,throttleFirst,sample,debounce和throttleWithTimeOut等,下面介绍filter,elementAt,distinct,skip,take,ignoreElements,throttleFirst,throttleWithTimeOut
3.1 filter
filter操作符会对源Observable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅者,如下
Observable.just(1,2,3,4).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Throwable {
return integer > 2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("FilterActivity","filter:" + integer);
}
});
打印log如下,大于2的数才会被打印出来
3.2 elementAt
elementAt操作符用来返回指定位置的数据,和它类似的还有elementAtOrDefault(int , T),elementAtOrDefault()可以允许默认值,如下
Observable.just(1,2,3,4).elementAt(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("FilterActivity", "elementAt:" + integer);
}
});
打印索引为2的数,如下
3.3 distinct
distinct操作符可以用来去重,只允许还没有发射过的数据项通过,和它类似的还有distinctUntilChanged操作符,distinctUntilChanged用来去掉连续重复的数据
Observable.just(1,2,2,3,4,1).distinct().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("FilterActivity", "distinct:" + integer);
}
});
打印如下
3.4 skip、take
skip操作符将源Observable发射的数据过滤掉前n项,而take操作符只取前n项,另外,skiplast和takelast操作符则是从Observable发射的数据的后面进行过滤操作,首先来看skip操作符,如下
Observable.just(1,2,3,4,5,6).skip(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("FilterActivity", "skip:" + integer);
}
});
打印后几项
再来看take
//take
Observable.just(1,2,3,4,5,6).take(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("FilterActivity", "take:" + integer);
}
});
取前几项,如下
3.5 ignoreElements
ignoreElements操作符忽略所有源Observable产生的结果,把Observable的onComplete()和onError()事件通知订阅者,如下
Observable.just(1,2,3,4).ignoreElements().subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onComplete() {
Log.d("FilterActivity", "onComplete");
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("FilterActivity", "onError");
}
});
输出如下
3.6 throttleFirst
throttleFirst操作符会定期发射这个时间段里源Observable发射的第一个数据,throttleFirst操作符默认在computation调度器上执行,和throttleFirst操作符类似的还有sample操作符,sample操作符会定时地发射源Observable最近发射的数据,其他的都会被过滤掉,throttleFirst操作符的使用示例如下所示
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Throwable{
for(int i = 0; i < 10; i++){
emitter.onNext(i);
try{
Thread.sleep(100);
}catch (InterruptedException e){
e.printStackTrace();
}
}
emitter.onComplete();
}
}).throttleFirst(200, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("FilterActivity", "throttleFirst:" + integer);
}
});
每隔100ms发射一个数据,throttleFirst操作符设定的时间为200ms,因此他会发射200ms内的第一个数据,如下
3.7 throttleWithTimeOut
通过时间来限流,源Observable每次发射出来一个数据后就会进行计时,如果在设定好的时间结束前源Observable有新的数据发射出来,这个数据就会被丢弃,同时throttleWithTimeOut重新开始计时。如果每次都是在计时结束前发射数据,那么这个限流就会走向极端:只会发射最后一个数据。throttleWithTimeOut默认在computation调度器上执行。和throttleWithTimeOut操作符类似的还有deounce操作符,它不仅可以使用时间来进行过滤,还可以根据一个函数来进行限流,throttleWithTimeOut操作符的使用实例如下所示
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Throwable{
for(int i = 0; i <10; i++){
emitter.onNext(i);
int sleep = 100;
if(i % 3 == 0){
sleep = 300;
}
try{
Thread.sleep(sleep);
}catch (InterruptedException e){
e.printStackTrace();
}
}
emitter.onComplete();
}
}).throttleWithTimeout(200, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("FilterActivity", "throttleWithTimeOut:" + integer);
}
});
每隔100ms发射一个数据,当发射的数据是3的倍数时,下一个数据就延迟到300ms再发射,这里设定的过滤时间是200ms,也就是说发射间隔小于200ms的数据会被过滤掉,打印结果如下
4. 组合操作符
组合操作符可以同时处理多个Observable来创建我们所需要的Observable,组合操作符有merge、concat、zip、combineLatest、join和switch等,这里介绍merge、concat、zip和combineLatest
4.1 merge
merge操作符可以将多个Observable合并到一个Observable中进行发射,merge可能会让合并的Observable发射的数据交错
Observable<Integer> obs1 = Observable.just(1,2,3).subscribeOn(Schedulers.io());
Observable<Integer> obs2 = Observable.just(4,5,6);
Observable.merge(obs1,obs2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("CombineActivity", "merge:" + integer);
}
});
输出结果如下
4.2 concat
将多个Observable发射的数据进行合并发射,concat严格按照顺序发射数据,前一个Observable没发射完成是不会发射后一个Observable的数据的
Observable<Integer> obs3 = Observable.just(1,2,3).subscribeOn(Schedulers.io());
Observable<Integer> obs4 = Observable.just(4,5,6);
Observable.concat(obs3,obs4).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("CombineActivity", "concat:" + integer);
}
});
输出如下
4.3 zip
zip操作符合并两个或多个Observable发射的数据项,根据指定的函数对Observable执行变换操作,并发射一个新值
Observable<Integer> obs5 = Observable.just(1,2,3);
Observable<String> obs6 = Observable.just("a", "b", "c");
Observable.zip(obs5, obs6, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Throwable{
return integer + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
Log.d("CombineActivity", "zip:" + s);
}
});
输出结果如下
4.4 combineLatest
当两个Observable中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。combineLatest操作符和zip有点类似,只不过zip操作如用于最近未打包的两个Observable,只有当原始Observable中的每一个都发射了一条数据时zip才发射数据,而combineLatest操作符用于最近发射的数据项,在原始的Observable中的任意一个发射了数据时侯继续发射一条数据
Observable<Integer> obs7 = Observable.just(1,2,3);
Observable<String> obs8 = Observable.just("a", "b", "c");
Observable.combineLatest(obs7, obs8, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Throwable {
return integer + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
Log.d("CombineActivity", "combineLatest:" + s);
}
});
如果其中一个Observable还有数据没有发射,那么combineLatest操作符会将两个Observable最新发射的数据组合在一起,如上。第一个Observable最新的数据是3,然后第二个Observable的数据依次在变,之后把第一个和第二个Observable数据组合在一起,打印如下
5. 辅助操作符
辅助操作符可以帮助我们更方便的处理Observable,辅助操作符包括delay、do、subscribeOn、observeOn、timeout、meterialize、dematerialize、timeInterval、timestamp和to等,下面介绍delay、do、subscribeOn、observeOn和timeout
delay
delay操作符让原始Observabl在发射每项数据之前都暂停一段指定的时间段
Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Throwable {
Long currentTime = System.currentTimeMillis() / 1000;
emitter.onNext(currentTime);
}
}).delay(2, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Throwable {
Log.d("HelpActivity", "delay:" + (System.currentTimeMillis() / 1000 - aLong));
}
});
输出结果如下
5.1 do
do系列操作符就是为原始的Observable的生命周期事件注册一个回调,当Observable的某事件发生时就会调用这些回调,RxJava中有很多do系列操作符,如下
- doOnEach:为Observable注册这样一个回调,Observable每发射一项数据就会调用一个回调函数,包括onNext、onError和onComplete
- doOnNext:只有执行onNext的时候会被调用
- doOnSubscribe:当观察者订阅Observable时就会被调用
- doOnError:当Observable异常终止调用onError时会被调用
- doOnTerminate:当Observable终止(无论正常终止或异常终止)之前会被调用
- finallyDo:当Observable终止(无论正常终止还是异常终止)之后会被调用
这里拿doOnNext来举例,如下所示
Observable.just(1,2).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("HelpActivity","call:" + integer);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d("HelpActivity", "onNext:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("HelpActivity", "Error:" + e.getMessage());
}
@Override
public void onComplete() {
Log.d("HelpActivity","onComplete");
}
});
输出如下
5.2 subscribeOn、observeOn
subscribeOn操作符用于指定Observable自身在哪个线程上运行,如果Observable需要执行耗时操作,则一般可以让其在新开的一个子线程上运行。observeOn用来指定Observe所运行的线程,也就是发射出的数据在那个线程上使用。一般情况下会指定其在主线程中运行,这样就可以修改UI,具体如下
Observable<Integer> obs = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
Log.d("HelpActivity", "Observable:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
}
});
obs.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("HelpActivity", "Observer:" + Thread.currentThread().getName());
}
});
subscribeOn(Schedulers.newThread())表示Observable运行在新开的线程,observeOn(AndroidSchedulers.mainThread())表示Observable运行在主线程,其中,AndroidSchedulers是RxAndroid库提供的Scheduler
输出结果如下
5.3 timeout
如果原始的Observable过了指定的一段时长还没有发射任何数据,则timeout操作符会以一个onError通知来终止这个Observable,或者继续执行一个备用的Observable。timeout有很多变体,这里介绍其中的一种:timeout(long,TimeUnit,Observable)。它在超时时会切换到使用一个你指定的备用Observable,而不是发送错误通知。它默认在computation调度器上执行,具体如下
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
for(int i = 0; i < 4; i++){
try{
Thread.sleep(i * 100);
}catch (InterruptedException e){
e.printStackTrace();
}
emitter.onNext(i);
}
emitter.onComplete();
}
}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(10,11));
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("HelpActivity", "timeout:" + integer);
}
});
6. 错误处理操作符
RxJava在错误出现时就会调用观察者的onError()方法将错误分发出去,由观察者自己处理错误,但是如果每个观察者都处理一遍错误的话,工作量就会很大。这可以使用错误处理操作符,错误处理操作符由catch和retry
6.1 catch
catch操作符拦截原始Observable的onError通知,将它替换为其他数据项或数据序列,让产生的Observable能够正常终止或者根本不终止
RxJava将catch实现为3个不同的操作符
- onErrorReturn:返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会发射一个特殊的项并调用观察者的onComplete()方法
- onErrorResumeNext:返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError()调用,不会将错误传递给观察者,作为替代,它会发射备用的Observable
- onExceptionResumeNext:和onErrorResumeNext类似,onExceptionResumeNext()方法返回一个镜像原有Observable行为的新Observable,不同的是,如果onError()收到的Throwable不是一个Exception,就将错误传递给观察者的onError()方法,而不会使用备用的Observable
下面举例onErrorReturn操作符
Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception{
for(int i = 0; i < 5; i++){
if(i == 2){
emitter.onError(new Throwable("Throwable"));
}
emitter.onNext(i);
}
emitter.onComplete();
}
}).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.e("MistakeActivity", "在onErrorReturn处理了: " + throwable.toString());
return 6;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.i("MistakeActivity", "onNext:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("MistakeActivity", "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.i("MistakeActivity", "onComplete");
}
});
输出结果如下
6.2 retry
retry操作符不会将原始的Observable的onError()通知传递给观察者,观察者会订阅这个Observable,再给这个Observable一次机会来无错误地完成它的数据序列。retry总是传递onNext通知给观察者,由于重新订阅,因此可能会造成数据项重复。RxJava中的实现为retry和retryWhen。这里拿retry(long)举例,retry(long)指定了最多重新订阅的次数。如果次数超了,它不会尝试再次订阅。retry(long)会把最新的一个onError通知传递给自己的观察者,如下
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
for(int i = 0; i < 5; i++){
if(i == 1){
emitter.onError(new Throwable("Throwable"));
}else{
emitter.onNext(i);
}
}
emitter.onComplete();
}
}).retry(3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d("MistakeActivity","onNext:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("MistakeActivity", "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.d("MistakeActivity", "onComplete");
}
});
上面重新订阅次数为3,在i = 0时调用onNext()方法,此外重试的3次也会调用onNext()方法,这样一共调用4次onNext(),最后调用onError()方法,输出如下
7. 条件操作符和布尔操作符
条件操作符和布尔操作符可用于根据条件发射或变换Observable,或者对他们做布尔运算。先来了解一下布尔操作符
布尔操作符
布尔操作符有all、contains、isEmpty、exists和sequenceEqual。下面介绍前3个操作符
7.1 all
all操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断的结果。这个函数使用发射的数据作为参考,内部判断所有的数据是否满足我们定义好的判断条件。满足返回true,否则返回false
Observable.just(1,2,3).all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Throwable {
Log.d("BoolActivity", "call:" + integer);
return integer < 2;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Throwable {
Log.d("BoolActivity", "accept--" + aBoolean);
}
});
输出结果如下
7.2 contains
contains操作符用来判断源Observable所发射的数据是否包含某一个数据。如果包含返回true。如果源Observable已经结束了却还没有发射这个数据,则返回false
Observable.just(1,2,3).contains(1).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Throwable {
Log.d("BoolActivity", "contains:" + aBoolean);
}
});
输出如下
7.3 isEmpty
isEmpty操作符用来判断源Observable是否发射过数据,如果发射过数据就返回false,如果源Observable已经结束了却还没有发射这个数据,则返回true
Observable.just(1,2,3).isEmpty().subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Throwable {
Log.d("BoolActivity", "isEmpty:" + aBoolean);
}
});
输出如下
条件操作符
条件操作符有amb、defaultIfEmpty、skipUntil、skipWhile、takeUntil、takeWhile等。这里介绍前两个操作符
7.4 amb
amb操作符对于给定的两个或多个Observable,它只发射首先发射数据或通知的那个Observable的所有数据
Observable.ambArray(Observable.just(1,2,3).delay(2, TimeUnit.SECONDS),Observable.just(4,5,6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("ConditionActivity", "amb:"+ integer);
}
});
第一个Observable延迟2s发射,所以很显然最终只会发射第二个Observable,输出结果如下
7.5 defaultIfEmpty
发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据,如下
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onComplete();
}
}).defaultIfEmpty(3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d("ConditionActivity","defaultIfEmpty:" + integer);
}
});
这里没有create任何数据,所以发射default的数据3
8. 转换操作符
转换操作符用来将Observable转换为另一个对象或数据结构,转换操作符有toList、toSortedList、toMap、toMultiMap、getIterator和nest等,这里介绍前3种操作符
8.1 toList
将发射多项数据的Observable会为每一项数据调用onNext()方法,toList操作可将该Observable发射的多项数据组合成一个List
Observable.just(1,2,3).toList().subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Throwable {
for(int i : integers){
Log.i("ConvertActivity", "toList:" + i);
}
}
});
输出结果如下
8.2 toSortedList
类似于toList操作符,不同的是,toSortedList操作符会将对产生的列表排序,默认自然升序。如果发射的数据项没有实现Comparable接口,则会抛出一个异常。当然,若发射的数据项没有实现Comparable接口,则可以使用toSortedList(Func2)的变体,其传递的函数参数可用于比较两个数据项
Observable.just(3,1,2).toSortedList().subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Throwable {
for(int i : integers){
Log.i("ConvertActivity", "toSortedList:" + i);
}
}
});
输出结果如下
8.3 toMap
操作符将原始的Observable发射的所有数据项收集到一个Map(默认是HashMap)中,然后发射这个Map。你可以提供一个用于生成Map的key的函数,也可以将一个函数转换后的数据项作为Map存储的值(默认情况下数据项本身就是值)
People p1 = new People("小A","A");
People p2 = new People("小B","B");
People p3 = new People("小C","C");
Observable.just(p1,p2,p3).toMap(new Function<People, String>() {
@Override
public String apply(People people) throws Throwable {
return people.id;
}
}).subscribe(new Consumer<Map<String, People>>() {
@Override
public void accept(Map<String, People> stringPeopleMap) throws Throwable {
Log.i("ConvertActivity", "toMap:" + stringPeopleMap.get("B").name);
}
});
打印key值为B的名字
本文摘抄自《进阶之光——刘望舒》,是个人学习路线上的总结,不以盈利为目的。
欢迎指正。