从16年11月份推出RxJava 2.0 ,到现在差不多大半年的时间里,RxJava已经来到了2.x时代,RxJava 1.x 可能也慢慢地被2.x 代替。RxJava 2.x在Reactive-Streams规范的基础上从头开始完全重写,虽然操作符基本没有发生变化,但是因为Reactive-Streams具有不同的架构,因此对一些众所周知的RxJava类型进行了更改。
RxJava 2.x相对与1.x还是有很多的不同,RxJava的类型更改了,很多类的命名和方法的命名发生了变化(可能功能与1.x相同),要想从RxJava 1.x 顺利地过渡到2.x, 就得了解这些变化。因此本教程就带你了解这些变化,从而能更快地上手RxJava2.x。
本篇文章就来先了解一下RxJava 2.x的5种响应类型。
一、RxJava 2.x中5种类型介绍
1 . Observable and Flowable
关于在RxJava 0.x版本引入背压的一个小的遗憾是,没有设计一个单独的基础反应类,而是对Observable本身进行了改装。背压的主要问题在于热源(如:UI事件),不能合理地反压并导致不可预料的异常MissingBackpressureException,这是初学者不期望看到的。
在RxJava 2.x版本中修复了这种情况,将o.reactivex.Observable
作为非背压,引入新的io.reactivex.Flowable
作为启用背压基础反应类。
好消息是,在2.x版本中,主要的操作符保持不变(同1.x版本),坏消息是,在导入的时候应当小心,它可能会无意的选择非背压的o.reactivex.Observable
.
我们应该选哪种?
当构建数据流(作为RxJava的最终消费者)或考虑您的2.x兼容库应该采取和返回什么类型时,您可以考虑几个因素,以帮助您避免诸如MissingBackpressureException或OutOfMemoryError之类的问题。
Observable使用场景:
- 数据流最长不超过1000个元素,即随着时间的流逝,应用没有机会报OOME(OutOfMemoryError)错误。
- 处理诸如鼠标移动或触摸事件之类的GUI事件
Flowable使用场景:
- 处理超过10K+ 的元素流
- 从磁盘读取(解析文件)
- 从数据库读取数据
- 从网络获取数据流
2 . Single 使用介绍
Single是2.x版本中的一种基础响应类型,Single是从头开始重新设计的,能单独发射一个onSuccess
或者onError
事件,它现在的架构来自于the Reactive-Streams
设计。它的消费者类型已经从接受rx.Subscription
的rx.Single.SingleSubscriber<T>
变为了io.reactivex.SingleObserver<T>
,有3个方法:
interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
3 . Completable使用介绍
Completeble类型基本保持不变,1.x的版本已经沿着Reactive-Streams风格设计,所以没有用户级别的更改。
相似地命名改变,rx.Completable.CompletableSubscriber
变为带有onSubscribe(Disposable)
方法的io.reactivex.CompletableObserver
:
interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
}
4 . Maybe 使用介绍
RxJava 2.0.0-RC2 介绍了一个新的基础响应类型,它叫做Maybe。从概念上来将,它像是 Single和Completable的结合,它可能发射0个或者1个项目,或者一个error信号。
Maybe类通过依赖MaybeSource作为它的基础接口类型MaybeObserver<T>
作为信号响应接口并且遵循协议onSubscribe (onSuccess | onError | onComplete)?因为最多可能发射1个元素,所以Maybe类型没有背压的概念(因为它没有像Flowable和Observable一样有未知长度的可膨胀缓冲区)
这意味着onSubscribe(Disposable)
的调用潜在地跟随着其他onXXX
方法之一的调用,不同于Flowable,如果这儿只有一个信号值发射信号,那么只有onSuccess
被调用,而不会调用complete
。
二、RxJava 2.x中5种类型使用示例
1 . Observable示例
在写示例之前,我们先来回顾一下 1.x 版本是如何创建Observable和如何订阅的:
RxJava 1.x :
//创建 observable
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
//订阅方式一
observable.subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
});
// 订阅方式二
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
// onNext
}
});
通过create
方法创建Observable,接收一个OnSubscribe
接口,其中有一个回调方法call
,参数为Subscriber
,我们用Subscriber
来发射数据。通过subscribe
方法来订阅,可以接收一个Subscriber 实现全部方法,也可以接收一个Action1
只实现onNext方法。
RxJava 2.x :
//创建Observable
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
Log.e(TAG,"start emitter data");
e.onNext("Hello");
e.onNext("world");
e.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
// 订阅方式一:下游消费者 Observer
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
Log.e(TAG,"onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.e(TAG,"onNext");
Log.e(TAG,s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError");
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
// 订阅方式二:Consumer
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String o) throws Exception {
Log.e(TAG,"consumer:"+o);
}
});
打印结果如下:
06-25 14:31:35.435 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onSubscribe
06-25 14:31:35.437 21505-21853/com.zhouwei.demoforrxjava2 E/MainActivity: start emitter data
06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onNext:Hello
06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onNext:world
06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onComplete
其实我们可以对比一下,1.x 和 2.x 方法都试一样的,只是它们所接收的响应接口改变了,对应变化如下:
RxJava 1.x -> RxJava 2.x
---------------------------------------
OnSubscribe<T> -> ObservableOnSubscribe<T>
Subscriber<T> -> Observer<T>
Subscriber<T> -> ObservableEmitter<T>
Action1<T> -> Consumer<T>
RxJava 2.x 中对这些接口进行了重新设计,让一个接口的职责更加单一,类的命名和方法命名与它的功能更佳符合(见名知意)。如在1.x 中,Subscriber 既能发射数据,又能消费数据,充当观察者和被观察者。在2.x 中 把它拆解成了2个接口。
ObservableEmitter<T>
专门用来发射数据,Consumer
专门用来消费数据。 除此之外,在RxJava 2.x 中,多了一个void onSubscribe(@NonNull Disposable d)
回调方法,参数为Disposable
,Disposable
是用来解除订阅关系的,这让我们的解除订阅变得更佳容易(比起1.x 通过subscribe
返回 Subscription)。
上面对比了在RxJava 1.x 和2.x 版本创建Observable的方式,其实在RxJava 2.x中,这5种类型的用法是非常相似的,它们的接口命名规则相同,只要你知道其中一种,就知道其他几种类型该如何在上游发射数据和在下游消费数据。create
接收的类型都为xxxOnSubscrible(xxx为5种类型对应的名字),发射器为xxxEmitter,具体如下表:
RxJava 2.x 类型 | create参数(响应接口) | 发射器 | Observer |
---|---|---|---|
Observable | ObservableOnSubscribe | ObservableEmitter | Observer |
Flowable | FlowableOnSubscribe | FlowableEmitter | FlowableSubscriber |
Single | SingleOnSubscribe | SingleEmitter | SingleObserver |
Completable | CompletableOnSubscribe | CompletableEmitter | CompletableObserver |
Maybe | MaybeOnSubscribe | MaybeEmitter | MaybeObserver |
2 . Flowable示例
上面对比了RxJava 1.x 和 2.x 创建使用Observable的方式,并且总结了2.x 相关类的改变,如上面表。那么使用Flowable的方式和Observable是很相似的,看一下代码:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG,"start send data ");
for(int i=0;i<100;i++){
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.DROP)//指定背压策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(@NonNull Subscription s) {
//1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
//2, 参数为 Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求
//3, 必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。
Log.e(TAG,"onSubscribe...");
subscription = s;
s.request(100);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext:"+integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError..."+t);
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete...");
}
});
Flowable和Observable的使用基本相同,只不过Observable不支持背压,而Flowable支持背压。使用的时候,还是要注意几个小细节:
1,创建Flowable的时候需要指定一个背压策略,本文使用的是PBackpressureStrategy.DROP(丢弃策略),RxJava 2.x中,内置了5种背压策略,由于篇幅有限,背压和背压策略下一篇拿出来单独讲。
2,onSubscribe 回调方法中,参数是Subscription而不是Disposable,前文说过,RxJava 2.x 中,订阅的管理换成了Disposable,但是Flowable使用的是Subscription,这个Subscription不是1.x 版本中的Subscription,虽然它有取消订阅的能力。主要用于请求上游元素和取消订阅。
3,在使用Flowable的时候,必须调用Subscription 的requsest方法请求,不然上游是不会发射数据的。看request的方法解释:
3 . Single、Completable 和 Maybe 示例
Single、Completable和Maybe就比较简单,Single用于只发射一个数据,Completable不发送数据,它给下游发射一个信号。而Maybe则是Single和Completable的结合,根据名字就可以看出,它的结果是不确定的,可能发发射0(Completable)或1(Single) 个元素,或者收到一个Error信号。
Single示例:
Single.create(new SingleOnSubscribe<Boolean>() {
@Override
public void subscribe(@NonNull SingleEmitter<Boolean> e) throws Exception {
Log.e(TAG,"subscribe...");
e.onSuccess(true);
}
})
.observeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Boolean>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG,"onSubscribe...");
}
@Override
public void onSuccess(@NonNull Boolean aBoolean) {
Log.e(TAG,"onSuccess...");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError...");
}
});
Single只发射一个元素,所以没有complete
方法,不像Observable或者Flowable,数据发射完成之后,需要调用complete
告诉下游已经完成。
Completable示例:
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(@NonNull CompletableEmitter e) throws Exception {
Log.e(TAG,"start send data");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG,"onSubscribe");
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError");
}
});
Completable 不会发射数据,只会给下游发送一个信号。回调 onComplete
方法。
Maybe示例:
Maybe.create(new MaybeOnSubscribe<Boolean>() {
@Override
public void subscribe(@NonNull MaybeEmitter<Boolean> e) throws Exception {
Log.e(TAG,"start send data");
e.onSuccess(true);
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG,"onSubscribe");
}
@Override
public void onSuccess(@NonNull Boolean aBoolean) {
Log.e(TAG,"1->onSuccess:"+aBoolean);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError");
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
Maybe是Single和Completable的结合,需要注意的是
onSuccess
和onComplete
方法只会执行其中一个,这不同于Observable和Flowable最后是以onComplete()
结尾.
如上面的代码,先调用onSuccess
发射一个元素,再调用onComplete
,
e.onComplete();
e.onSuccess(true);
最后打印结果如下:
E/MainActivity: onSubscribe
E/MainActivity: start send data
E/MainActivity: onComplete
可以看到只回调了 onComplete
,我们把调用的顺序调换一下:
e.onSuccess(true);
e.onComplete();
打印结果如下:
E/MainActivity: onSubscribe
E/MainActivity: start send data
E/MainActivity: 1->onSuccess:true
可以看到调换了之后打印OnSucces()
而没有打印onComplete()
,这也印证了只会回调其中之一。
三、总结
RxJava 2.x 相比于 1.x 还是有很大的变化,虽然操作符基本不变,但是很多类和接口都是基于Reactive-Streams 规范重新设计的,命名也发生了变换,要想玩转RxJava 2.x ,你得了解这些变化和使用场景,本文介绍了RxJava 2.x 的5种基础响应类型,希望对才开始学习RxJava 2.x 的同学有所帮助。