目录
什么是RxJava
依赖
在安卓中,除了依赖RxJava外,还需要依赖一下RxAndroid
compile 'io.reactivex.rxjava2:rxandroid:2.1.0'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
鉴于RxJava源码比较多,本系列不会分析源码,主要介绍一些常用到的东西
1. RxJava简要介绍
RxJava中有三个关键点
Observable,Observer,subscribe
观察者模式说
Observable,被观察者
Observer,观察者
subscribe,建立订阅关系
在观察者模式中,一般都是观察者订阅被观察者
但在RxJava中,是Observable.subscribe(Observer)
水管说
Observable,上游水管
Observer,下游水管
subscribe,连接上下游的水管
2. 基本使用
废话不多说,先来一段代码感受一下
//关注点1
Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("TAG","subscribe");
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
});
//关注点2
Observer<String> observer=new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG","onSubscribe");
}
@Override
public void onNext(String s) {
Log.d("TAG",s);
}
@Override
public void onError(Throwable e) {
Log.d("TAG","onError");
}
@Override
public void onComplete() {
Log.d("TAG","onComplete");
}
};
//关注点3
observable.subscribe(observer);
这段代码的执行结果
D/TAG: onSubscribe
D/TAG: subscribe
D/TAG: 1
D/TAG: 2
D/TAG: 3
D/TAG: onComplete
站在观察者模式的角度
关注点1的Observable是被观察者,被观察者用来发布事件
关注点2的Observer是观察者,观察者用来接收事件
关注点3用来关联观察者与被观察者
站在水管的角度
把关注点1比喻成一根水管,处在上游
把关注点2也比喻成一根水管,处在下游
关注点3就是连接这两根水管的一个媒介,通过该媒介关注点1和关注点2就连通了。
我们在来看看剩下的两个比较陌生的类
ObservableEmitter和Disposable
ObservableEmitter
这是一个接口,继承自Emitter接口,翻译过来就是发射器的意思,而它就是用来发出事件的,通过调用emitter的onNext(T value),
onError(Throwable t)和onComplete()分别用来发生next事件,error事件和complete事件。
下面来总结一下事件的发送接收规则
- 事件是发送一个,接收一个,然后才发送下一个。
- 上游可以发送无限个next事件,下游也可以接收无限个next事件。
- 当上游发送complete或者error事件后,下游不会在接收complete或者error事件之后的事件,但上游会将全部事件都发送完。
- 上游的complete和error事件必须唯一且互斥,否则可能相应的事件不会被下游接收到,并且可能会发生手机崩溃的情况。
Disposable
Disposable是一个接口,里面就两个方法
void dispose();
boolean isDisposed();
修改关注点1和关注点2的代码
//关注点1
Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("TAG","subscribe");
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
emitter.onNext("3");
Log.d("TAG","emitter 3");
}
});
//关注点2
Observer<String> observer=new Observer<String>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG","onSubscribe");
disposable=d;
}
@Override
public void onNext(String s) {
Log.d("TAG",s);
if(s.equals("1")){
disposable.dispose();
Log.d("TAG", "isDisposed : " + disposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d("TAG","onError:"+e.getMessage());
}
@Override
public void onComplete() {
Log.d("TAG","onComplete");
}
};
输出结果
D/TAG: onSubscribe
D/TAG: subscribe
D/TAG: 1
D/TAG: isDisposed : true
D/TAG: emitter 3
结果分析
当在下游调用disposable.dispose()方法时,下游将不会在接收后续的事件,但上游还是会将全部事件发送完毕。
这里的dispose就相当于将水管切断了,因此接收不了后续事件。
链式调用
我们来稍微修改一下之前的写法,就变成了RxJava流行的链式调用写法了。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
通过源码发现(CreateEmitter)
当我们调用onComplete或者onError时,会触发dispose方法
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}