参考链接:
//www.greatytc.com/p/464fa025229e
Rxjava2学习笔记二:RxJava2进阶使用-zip操作符
//www.greatytc.com/p/ef8b620fdc4c
Rxjava2学习笔记三:RxJava2进阶使用-map操作符
//www.greatytc.com/p/f7efc1aeb6c9
1.Gradle配置
- compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
2.原理
-
先假设2根水管:
- 上面一根水管为事件产生的水管,叫它上游吧,下面一根水管为事件接收的水管叫它下游吧
- 两根水管通过某种方式连接在一起:使得上游每产生一个事件,下游都能收到该事件
上游事件产生顺序:1->2->3;下游事件接收顺序:1->2->3
基本使用例1
注:Rxjava2中的emitter-》发射器,用于被观察者发射事件
Disposable-》RxJava1.x中的Subscription,用于解除订阅
//创建一个上游 Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//创建一个下游 Observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
//建立连接
observable.subscribe(observer);
基本使用例2-链式调用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
- 注:ObservableEmitter[发射器]:发出事件
->调用emitter的onNext(T value)、onComplete()和onError(Throwable error)
->发出next事件、complete事件和error事件
Emitter-发送规则
- 1.上游可以发送无限个onNext, 下游也可以接收无限个onNext.
- 2.当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送,
而下游收到onComplete事件之后将不再继续接收事件. - 3.当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
- 4.上游可以不发送onComplete或onError.
- 5.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然
发送规则示意图
1.发送onNext()事件
2.发送onComplete()事件
Disposable-订阅
1.相当于RxJava1.x中的Subscription,用于解除订阅
2.解除订阅:disposable.dispose();
3.多个Disposable时取消订阅,RxJava中已经内置了一个容器CompositeDisposable,每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.
-
4.eg:
Disposable disposable = observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //这里接收数据项 } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //这里接收onError } }, new Action() { @Override public void run() throws Exception { //这里接收onComplete。 } }); disposable.dispose();//解除订阅
Consumer-消费者
- 1.用于接收单个值,其他的如:BiConsumer则是接收两个值,Function用于变换对象,Predicate用于判断用法如上的例子
- 2.替代了RxJava1.x中的Action/Func接口
3.Rxjava线程调度
- 1 subscribeOn(Schedulers.io())//上游发送事件的线程,第一次有效(如网络请求可在IO或子线程发送事件)
observeOn(AndroidSchedulers.mainThread())//下游接收事件的线程(主线程接收返回信息后更新UI)
observeOn(Schedulers.io())//线程切换;可多次切换每调用一次observeOn() , 下游的线程就会切换一次.