RxJava2.0已经完全基于Reactive-Streams库重写,Reactive-Streams是从RxJava 1.x版本中分离出来一个库,为响应式系统开发库提供基本规格。
因为Reactive-Streams有 的着不同设计风格,它改变了很多RxJava中一些被熟知的类型。RxJava项目的wiki页概括了这些改变以及如何把1.x的代码转到2.x的代码。
目录:
Maven address and base package
Javadoc
Nulls
Observable and Flowable
Single
Completable
Maybe
Base reactive interfaces
Subjects and Processors
Other classes
Functional interfaces
Subscriber
Subscription
Backpressure
Reative-Stream compliance
Runtime hooks
Error handling
Scheduler
Entering the reactive world
Leaving the reactive world
Testing
Operator differences
Miscellaneous Changes
Maven地址和基本包名
RxJava 1.x和RxJava 2.x现已分开管理,RxJava 2.x的maven地址是io.reactivex.rxjava2:rxjava:2.x.y包名是io.reactivex.
Javadoc
官方的Javadoc网页托管在http://reactivex.io/RxJava/2.x/javadoc/
Nulls
RxJava 2.x不再接收null,下面的代码会马上产生NullPointerException给下游的流。
Observable.just(null);
Single.just(null);
Observable.fromCallable(()->nul)).subscribe(System::out::println,Throwable::printStackTrace);
Observable.just(1).map(v->null).subscribe(System::out::println,Throwable::printStrackTrace);
这意味着Observable<Void>不会发射任何值,而只会正常中断或者抛出一个异常。要实现该功能,可以定义一个以Object作为泛型的Observable,即Observable<Object>,这里的Object是一个无关紧要的类。一个比较好的例子就是定义一个只有一个值的枚举类型:
enum Irrelevant{
INSTANCE;
}
Observable<Object> source=Observable.create((ObservableEmitter<Object> emitter)->{
System.out.println("Side-effect 1");
emitter.onNext(Irrelevant.INSTANCE);
System.out.println("Side-effect 2");
emitter.onNext(Irrelevant.INSTANCE);
System.out.println("Side-effect 3");
emitter.onNext(Irrelevant.INSTANCE);
});
source.subscribe(e->{/*这里我们并不关心发射的对象是什么,即使如此,被发射的对象也不能为null或者Void*/},Throwable::printStackTrace);
Observable and Flowable
RxJava 0.x 引入背压的时候没有单独定义一个基类,导致Observable概念复杂而多样(一词多义)。对于一些常见的数据源,比如UI事件,是不能背压的,但是使用Observable却不得不背压,从而导致UI事件也会触发MissBackpressureException异常。
为此,2.x版本定义Flowable来表示可背压的数据源,而Observable则表示不能背压的数据源。
好消息是操作符(大部分)维持原样,坏消息是在导包时需要特别注意,不然一不小心就会在需要背压时引入了不支持背压的Observable。
选择Observable还是Flowable?
避免出错需要考虑的因素:
何时使用Observable
·个数不超过1000的数据流:数量少以至于几乎不可能出现Out Of Memory异常
·GUI事件,比如鼠标事件,触点事件:这些数据源通常很少出现背压并且不频繁。可以考虑使用sampling/debouncing来处理频率接近或者小于1000Hz的数据源。
·在不支持Java Streams的平台使用Streams特性。
何时使用Flowable
·超过10k+的数据源,并且可以控制数据源生成的数据数量
·读取文件
·读取JDBC数据库
·网络IO请求
·阻塞或者拉取式数据源
Single
2.x版本Single响应式数据源,发射一个onSuccess或者onError事件,根据Reactive-Streams规格重写。Single数据源的消费者(rx.Single.SingleSubscriber<T>)已经从一个实现rx.Subscription接口的类变成一个具有三个方法的接口io.reactivex.SingleObserver<T>:
interface SingleObserver<T>{
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
Single数据源的onSuccess和onError的事件只发送一个。
Completable
Completable数据源大体保持不变。1.x版本在设计时就和Reactive-Streams规范符合得很好,所以没有用户层面的改动。
和其他类一样,rx.Completable.CompletableSubscriber迁移到io.reactivex.CompletableObserver,并且增加了方法onSubscribe(Disposable):
interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
}
同样onComplete和onError事件只触发一次。
Maybe
RxJava 2.0.0-RC2介绍了一种新的响应式数据源,叫做Maybe。从概念上讲,Maybe是Single和Completable的结合体,它提供捕获从某些响应式数据源发射的信号,或0个或1个或error,的方法。
Maybe依赖一个MaybeSource作为基本接口类型,依赖一个MaybeObserver<T>作为信号接收的接口,并且遵从协议onSuccess,onError,onComplete事件只发送一次。因为至多只有一个元素被发射,所有Maybe没有背压的概念(因为长度未知的Flowable或者Observable没有缓冲膨胀的可能性)。
这意味着调用onSubscribe(Disposable)之后会紧跟着一个onXXX方法。和Flowable不同的是,如果只有一个信号被发射,只有onSuccess会被调用和onComplete不会。
Maybe这个新的响应式数据源和其他的数据源没什么两样,但是它提供的操作符是Flowable操作符的一个子集,这个子集处理0个或者1个的数据序列。
基本响应式接口
正如Flowable实现了Reactive-Streams的Publisher<T>接口一样,其他的基本响应式数据源也实现了类似的接口:
interface ObservableSource<T>{
void subscribe(Observer<? super T> observer);
}
interface SingleSource<T>{
void subscribe(SingleObserver<? super T> observer);
}
interface CompletableSource{
void subscribe(CompletableObserver observer);
}
interface MaybeSource<T>{
void subscribe(MaybeObserver<? super T> observer);
}
所以,很多需要传入响应式基本数据源的操作函数现在可以接受Publisher和XSource对象。
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
Publisher(Flowable实现了该接口)作为数据源时,可以在不把这些操作函数转化为Flowable的情况下组合(compose 在用于主动语态时,一般它所表示的“构成”或“组成”总包含着融合为一,而且主语或者是复数名词或者是集体名词)其他的Reactive-Streams操作函数(比如subScribeOn, observeOn等):
(这句话怎么理解呢?因为很多操作函数的返回值是Publisher, 而不是Flowable, 但是不需要手动把Publisher转成Flowable就能保证链式调用的连续性)
source.compose((Flowable<T> flowable)->
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()));
如果一个操作函数返回一个响应式基本类型,那么将会返回全称(但是Xsource基本数据源不会有此现象,因为Xsource类型的数据源不提供这样的函数,比如Single就没有window函数):
Flowable<Flowable<Integer>> windows=source.window(5);
Subjects and Processors
在Reactive-Streams规格中,subject特性,即是消费者的同时又是供应商的特性,由接口org.reactivestreams.Processor实现。鉴于Observable和Flowable分开,Reactive-Streams背压机制的实现是基于FlowableProcessor<T>类(该类继承于Flowable,具有丰富的操作符)。关于Subject(即Reactive-Streams的FlowableProcessor),一个重要的改动是它们不再提供T->R的转换(即输入T,输出R)。(因为1.x版本中从未使用过该转换,之所以1.x版本中重载该转换,是因为该转换来自于.NET(原来是借鉴.NET),在.NET中,该函数能够接收多个不同类型的参数)。
类io.reactivex.subjects.AsyncSubject, io.reactivex.subjects.BehaviorSubject, io.reactivex.subjects.PublishSubject, io.reactivex.subjects.ReplaySubject和io.reactivex.subjects.UnicastSubject, 在2.x版本中不支持背压(作为2.x版本Observable家族的一部分)。
类io.reactivex.processors.AsyncSubject, io.reactivex.processors.BehaviorSubject, io.reactivex.processors.PublishSubject, io.reactivex.processors.ReplaySubject和io.reactivex.processors.UnicastSubject支持背压。BehaviorProcessor和PublishProcessor不会协调下游消费者的请求,如果下游消费者消费速度跟不上,就会收到MissingBackpressureException异常。其他XProcessor类型遵从下游消费者的背压(跟不上时不会抛出异常,等待下游消费者消费完)。但除此之外,这些XProcessor订阅一个数据源时,它们以无限制的方式消费(即reqesting Long.MAX_VALUE)。
TestSubject
1.x版本的TestSubject被移除。它的功能通过TestScheduler, PublishProcessor/PublishSubject和observerOn(testScheduler)/scheduler参数实现。
TestScheduler scheduler=new TestScheduler();
PublishSubject<Integer> ps=PublishSubject.create();
TestObserver<Integer> ts=ps.delay(1000, TimeUnit.MILLISECONDS, scheduler)).test();
(delay方法属于Observable<T>,返回一个带有scheduler的Observable<T>,这个schedule里面维护了一个时间,这个时间是可以手动进行调节的)
ts.assertEmpty();
ps.onNext(1);
scheduler.advanceTimeBy(999,TimeUnit.MILLISECONDS);
ts.assertEmpty();
scheduler.advanceTimeBy(1,TimeUnit.MILLISECONDS);
ts.assertValue(1);
SerializedSubject
SerializedSubject不再是个公开类,取而代之的是通过Subject.toSerialized()和FlowableProcessor.toSerialized()方法间接获取。
其他类
类rx.observables.ConnectableObservable分开为两个类io.reactivex.observables.ConnectableObservable<T>和io.reactivex.flowables.ConnectableFlowable<T>类
GroupedObservable
rx.observables.GroupedObservable也拆分为两个:io.reactivex.observables.GroupedObservable<T>和io.reactivex.flowables.GroupedFlowable<T>
1.x版本中,可以通过GroupedObservable.from()来创建该类的实例,但是2.x版本,所有实例的类都必须的类必须继承GroupedObservable,1.x版本中所有创建该类实例的工厂方法都不在使用;GroupedObservable变成一个抽象类。
可以继承该抽象类并重载subscribeActual方法来实现1.x版本中相似的特性。
class MyGroup<K,V> extends GroupedObservable<K,V>{
final K key;
final Subject<V> subject;
public MyGroup(K key){
this.key=key;
this.subject=PublishSubject.create();
@Override
public T getKey(){
return key;
}
@Override
protected void subscribeActual(Observer<? super T> observable){
subject.subscribe(observer);
}