Rxjava2.0概述
通过前面的文章介绍,读者对RxJava2.0应该有了初步的认识。RxJava2.0相对1.0没有增加新的功能,最大的变化是把原来与背压有关的操作符抽取出来放在Flowable中处理;另外2.0的接口定义更符合ReactiveStream的规范。操作符的功能和1.0相比没有太大的变化,不过Flowable是背压相关的功能抽离出来的,本篇我们来详细分析下各种操作符的作用和用法。每种类型的操作符会选择几个重点介绍,提醒读者在阅读的过程中注意操作符背压的处理。另外操作符都是支持泛型的,对泛型不了解的读者,需要先熟悉一下泛型相关的知识。
创建操作符
create操作符
这里首先介绍create操作符,它是使用最广泛的创建操作符。
create操作符的使用场景
create是最基本的操作符,用来创建一个Flowable,事件流的生产和下发由用户自己定义,是一个完全可定制的Flowable。
create操作符的基本使用方法
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Object> e) throws Exception {
e.onNext("1");
e.onNext("2")
}
}, BackpressureStrategy.BUFFER)
.subscribe(new FlowableSubscriber<Object>() {
Subscription st=null;
@Override
public void onSubscribe(@NonNull Subscription s) {
s.request(1);
st =s;
}
@Override
public void onNext(Object o) {
System.out.print(s);
st.request(1);
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable throwable) {
}
});
以上输出的结果为:1 2
create方法的
- 第一个参数是FlowableOnSubscribe,它只有一个方法subscribe,subscribe是事件生产的地方,subscribe的参数FlowableEmitter是内部类,其中一个属性是FlowableSubscriber,也就是我们定义的观察者。通过调用 e.onNext("1"),FlowableEmitter会调用观察者onNext的方法即是FlowableSubscriber的onNext方法,从而完成事件下发给观察者。
- 第二个参数是背压策略,RxJava2.0定义了五种背压策略,后续文章会重点讲述背压策略的详细区别。背压策略会影响观察者能否正确接收到事件。
create操作符事件的处理流程
以上面的代码为例,当subscribe(new FlowableSubscriber<Object>()执行时,会首先执行onSubscribe(@NonNull Subscription s)方法,它执行在调用者的线程,对于背压来说,这里需要告知生产者事件的下流观察者的事件处理能力;然后是subscribe(@NonNull FlowableEmitter<Object> e)执行事件的的生产逻辑,这里会判断观察者的处理数据的能力决定是否执行观察者的onNext方法。这里判断的依据是s.request()的参数是否大于零。
create操作符需要注意的地方
前面也提到过,观察者能否接收到事件取决于s.request()的参数是否大于零或者s.request()是否被调用。如果FlowableSubscriber的方法onSubscribe没有调用过s.request(n)或者n<=0 ,FlowableSubscriber不会接收到事件。上面的例子中观察者只能就收到事件"1", 而不能接收"2",因为只调用对s.request(1),如果希望接收到两次事件可以在onSubscribe调用s.request(2),或者是onSubscribe调用s.request(1)并且onNext中调用s.request(1)。
fromArray操作符
fromArray操作符的使用场景
fromArray用于快速创建一个Flowable,直接发送数组的数据。
fromArray操作符的基本使用方法
Flowable.fromArray(new String[]{"1","2","3"})
.subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(String s) {
System.out.print(s);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
以上输出的结果为:1 2 3
fromArray的参数是一个数组,数组的元素会被依次发送出去。
fromArray操作符的事件 执行流程
1 从subscribe(new FlowableSubscriber<String>())方法开始执行;
2 通过内部处理onSubscribe方法会被执行,这里需要背压处理,如果没有调用s.request()或者s.request(n)的参数小于等于0,流程结束;
3 如果s.request(n)的参数大于零,会执行onNext;
4 重复步骤2,3,每执行一次onNext,参数n就会减1。
fromArray操作符需要注意的地方
1 数组不能是list类型,list会被当做一个事件发送出去。
2 背压处理类似create操作符,具体参考上文。
3 fromArray,FlowableSubscriber支持泛型,但是数组的元素类型和观察者的类型要一致。
转换操作符
该类型的操作符都作用于一个可观测序列,然后通过function函数把它变换其他的值,最后用一种新的形式返回它们。
map操作符
map操作符的基本用法
Flowable.fromArray(new String[]{"1","2","3"})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s+ "map";
}
})
.subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(String s) {
System.out.print(s);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
以上输出的结果为:1map 2map 3map
这里map操作符的参数Function有两个泛型,第一个泛型是接收到的事件的类型,第二个泛型是发送事件的类型,因此,第一个泛型需要和fromArray的泛型是同一个类型,而观察者的泛型和第二个泛型是同一个类型。
map操作符的执行流程
1 fromArray操作符首先创建一个Flowable1
2 Flowable1调用操作符map,产生一个新的Flowable2;
3 Flowable2 订阅观察者
4 Flowable1 调用观察者的onSubscribe()方法,需要处理背压;
5 观察者在onSubscribe()方法中通知上游观察者的能力是3;
6 Flowable1 开始发射事件"1";
7 Flowable2 的Function对事件“1”执行apply()变换,转化成"1map";
8 观察者接收到事件"1map";
9 重复步骤7,8,直到事件发送完毕或者被取消或者背压为0。
flatMap操作符
flatMap操作符的功能
flatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。
flatMap操作符的基本使用
Student student1 = new Student();
Student student2 = new Student();
Student student3 = new Student();
Student[] array={student1,student2,student3};
for(int i=0;i<2;i++){
array[i].cursors = new Cursor[2];
Cursor cursor1 = new Cursor();
cursor1.name="n1";
cursor1.score=10*i;
Cursor cursor2 = new Cursor();
cursor2.name="n2";
cursor2.score=10*i+1;
array[i].cursors[0]=cursor1;
array[i].cursors[1] = cursor2;
}
Flowable.fromArray(array)
.flatMap(new Function<Student, Flowable<Cursor>>() {
@Override
public Flowable<Cursor> apply(Student student) throws Exception {
return Flowable.fromArray(student.cursors);
}
}).subscribe(new FlowableSubscriber<Cursor>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(Cursor cursor) {
System.out.print(cursor.name+","+cursor.score);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
以上面的例子为例来理解flatMap操作符:需求是打印所有学生的各科成绩。
所有的学生为一个数组,通过fromArray操作符依次发送事件,每个学生作为一个事件发送出去,flatMap操作符把每个学生变换为对应的Flowable,这个Flowable会把学生的成绩依次发送出去,最后所有的成绩汇总,下发给观察者。
可以看出,不需要循环操作,通过flatMap操作符就完成了所有学生的各科成绩的打印。
flatMap操作符需要注意的地方
经过flatMap操作变换后,最后输出的序列有可能是交错的,因为flatMap最后合并结果采用的是merge操作符,后面会详细介绍merge操作符。如果要想经过变换后,最终输出的序列和原序列一致,那就会用到另外一个操作符,concatMap。
过滤操作符
顾名思义,这类操作符按照过滤条件对可观测的事件序列筛选,返回满足我们条件的事件。过滤类操作符主要包含: Filter, Take, TakeLast, TakeUntilSkip, SkipLast, ElementAt, Debounce, Distinct, DistinctUntilChanged, First, Last等等。
fliter操作符
fliter操作符的基本用法
Flowable.fromArray(array)
.filter(new Predicate<Student>() {
@Override
public boolean test(Student student) throws Exception {
return student.cursors[1].score<20;
}
})
.subscribe(new FlowableSubscriber<Student>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(Student student) {
System.out.print(student.name);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
以上面flatMap的场景,如果需求变为:要求打印第二门课程的成绩<20的学生名单。同样的fromArray会依次下发所有的学生,fliter操作符判断成绩是否小于20,将小于20的学生下发给观察者。
filter操作符的流程
1 fromArray创建Flowable1;
2 filter创建操作符Flowable2;
3 Flowable2 订阅观察者;
4 执行观察者的onSubscribe()方法,需要处理背压;
5 Flowable1 依次发送student事件;
6 Flowable2调用test()判断student事件是否符合筛选条件,符合返回true,否则返回false;
7 Flowable2 根据test()的结果下发事件,返回值=true 事件下发给观察者,返回值=false事件丢弃;
8 重复步骤5,6,7 直到事件发送完毕或者被取消或者背压为0。
distinct操作符
distinct操作符过滤规则是只允许还没有发射过的数据通过,所有重复的数据项都只会发射一次。在数据去重场景中非常有用。
distinct操作符的基本用法
Flowable.fromArray(array)
.distinct()
.subscribe(new FlowableSubscriber<Student>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(Student student) {
System.out.print(student.name);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
distinct操作符的执行流程
1 fromArray创建Flowable1;
2 distinct创建操作符Flowable2;
3 Flowable2 订阅观察者;
4 执行观察者的onSubscribe()方法,此处应该调用request()方法,否则后面的流程中事件无法下发给观察者
5 Flowable1 依次发送student事件给Flowable2;
6 Flowable2判断事件是否已经下发过,对事件去重;
7 如果已经下发过该事件则调用request(1),纠正背压;若果该事件没有下发过且被压不为零则下发给观察者;
8 重复步骤5,6,7 直到事件结束或者取消或者背压为零。
组合操作符
merge操作符
Merge其实只是将多个Flowable的输出序列变为一个,方便订阅者统一处理,对于订阅者来说就仿佛只订阅了一个观察者一样
merge操作符的基本使用方法
Flowable f1 = Flowable.just(1, 2, 3);
Flowable f2 = Flowable.just(7, 8, 9, 10);
Flowable.merge(f1,f2)
.subscribe(new FlowableSubscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(7);
}
@Override
public void onNext(Object o) {
System.out.print("merge:"+o.toString());
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
merge操作符的执行流程
1 创建Flowable1;
2 创建Flowable2
3 merge创建操作符Flowable3;
4 Flowable3 订阅观察者;
5 Flowable3 内部创建1个队列,用来Flowable1和Flowable2的事件
6 执行观察者的onSubscribe()方法,此处应该调用request()方法,否则后面的流程中事件无法下发给观察者
7 Flowable1 发送事件,Flowable3将其放进队列中;
8 Flowable3 判断队列是否有事件,背压是否不为零
9 如果8的条件都满足,将队列下发给观察者;
10 重复步骤7,8,9 直到事件结束或者取消或者背压为零。
zip操作符
zip通过一个函数将多个Flowable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Flowable一样多的数据
zip操作符的基本使用方法
Flowable f1 = Flowable.just(1, 2, 3);
Flowable f2 = Flowable.just(7, 8, 9, 10);
Flowable.zip(f1, f2, new BiFunction() {
@Override
public Object apply(Object o, Object o2) throws Exception {
return "f1: "+o.toString()+" and f2: "+o2.toString();
}})
.subscribe(new FlowableSubscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(4);
}
@Override
public void onNext(Object o) {
System.out.print("zip:"+o.toString());
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
zip操作符的执行流程
1 创建Flowable1;
2 创建Flowable2
3 zip创建操作符Flowable3;
4 Flowable3 订阅观察者;
5 Flowable3 内部创建两个队列,分别用来暂存Flowable1和Flowable2的事件
6 执行观察者的onSubscribe()方法,此处应该调用request()方法,否则后面的流程中事件无法下发给观察者
7 Flowable1 发送事件,Flowable3将其放进队列1中;
8 Flowable3 判断队列1和队列2是否有事件,背压是否不为零
9 如果8的条件都满足,将队列1和队列2的事件执行BiFunction的apply()方法;
10 如果8的条件不满足 Flowable2 发送事件,Flowa3将其放进队列2中;
11 Flowable3 判断队列1和队列2是否有事件,背压是否不为零;
12 如果11的条件都满足,将队列1和队列2的事件执行BiFunction的apply()方法;
13 重复步骤7,8,9,10,11,12 直到事件结束或者取消或者背压为零。
zip操作符需要注意的地方
1 如果多个Flowable的事件长度不一样,观察者接收到的是事件长度和最短的Flowable事件长度一样。
2 如果多个Flowable的事件长度不一样,最短的是d,每个Flowable只取前面d个事件。
切换线程操作符
subscribeOn
subscribeOn操作符的基本用法
Flowable.fromArray(array)
.subscribeOn(Schedulers.newThread())
.subscribe(new FlowableSubscriber<Student>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
System.out.print(Thread.currentThread().getName());
}
@Override
public void onNext(Student student) {
System.out.print(student.name);
System.out.print(Thread.currentThread().getName());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
subscribeOn操作符的流程
1 fromArray操作符创建Flowable1;
2 subscribeOn操作符创建Flowable2;
3 Flowable2订阅观察者;
4 Flowable2 内部调用观察者的onSubscribe()方法;
5 Flowable2 切换到线程Thread2;
6 Flowable1 在线程Thread2中生产并下发事件;
7 观察者在线程Thread2中接收事件;
8 重复步骤6,7,直到事件下发完毕或者被取消。
subscribeOn操作符需要注意的地方
1 在一个场景中可以多次使用subscribeOn操作符;
2 多次使用时,Flowable的事件都是在第一个subscribeOn操作符的线程种执行;
3 后面的 subscribeOn 只会改变前面的 subscribeOn 调度操作所在的线程,并不能改变最终被调度的事件执行的线程,但对于中途的代码执行的线程,还是会影响到的。
observerOn
observerOn操作符的基本用法
Flowable.fromArray(array)
.observeOn(Schedulers.newThread())
.subscribe(new FlowableSubscriber<Student>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
System.out.print(Thread.currentThread().getName());
}
@Override
public void onNext(Student student) {
System.out.print(student.name);
System.out.print(Thread.currentThread().getName());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
observerOn操作符的流程
1 fromArray操作符创建Flowable1;
2 observerOn操作符创建Flowable2;
3 Flowable2订阅观察者;
4 Flowable2 内部调用观察者的onSubscribe()方法;
5 Flowable1 生产并下发事件;
6 Flowable2 接收Flowable1的事件并切换到线程Thread2;
7 观察者在线程Thread2中接收事件;
8 重复步骤5,6,7直到事件下发完毕或者被取消。
observerOn操作符需要注意的地方
1 在一个场景中可以多次使用observerOn操作符;
2 每级的Observer的onXXX方法都在不同的线程中被调用。所以observeOn的调用会多次生效;
3 observeOn 影响它下面的调用执行时所在的线程,每次调用都会改变数据向下传递时所在的线程。
参考
给初学者的RxJava2.0教程
探索专为 Android 而设计的 RxJava 2
RxJava系列6(从微观角度解读RxJava源码)
浅谈 RxJava 与 2.0 的新特性
RxJava 2.x 使用详解