这个页面展示的操作符可用于组合多个 Observables。
Delay — 延时发射 Observable 的结果。
DelaySubscription — 延时处理订阅请求。
DoOnEach — 注册一个动作,对 Observable 发射的每个数据项使用。
DoOnComplete — 注册一个动作,对正常完成的 Observable 使用。
DoOnError — 注册一个动作,对发生错误的 Observable 使用。
DoOnTerminate — 注册一个动作,对完成的 Observable 使用,无论是否发生错误。
DoOnSubscribe — 注册一个动作,在观察者订阅时使用。
DoOnUnsubscribe — 注册一个动作,在观察者取消订阅时使用。
Dematerialize — 将上面的结果逆转回一个 Observable
ObserveOn — 指定观察者观察 Observable 的调度器
Materialize — 将 Observable 转换成一个通知列表
Serialize — 强制一个 Observable 连续调用并保证行为正确
Subscribe — 操作来自 Observable 的发射物和通知。
SubscribeOn — 指定 Observable 执行任务的调度器。
TimeInterval — 定期发射数据。
Timeout - 对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。
Timestamp — 给 Observable 发射的每个数据项添加一个时间戳。
6.1 Delay
延迟一段指定的时间再发射来自 Observable 的请求。
![Delay](http://opgvsfix4.bkt.clouddn.com/rxjava_delay.c.png)
RxJava 的实现是 delay 和 delaySubscription。不同之处在于 Delay 是延时数据的发射,而 DelaySubscription 是延时注册 Subscriber。
6.1.1 Delay
![delay](http://opgvsfix4.bkt.clouddn.com/rxjava_delay.png)
示例代码:
final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delay(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
if (integer == 1) {
Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
}
Log.e(TAG, "accept:" + integer);
}
});
输出结果:
delay Time :2408
accept:1
accept:2
6.1.2 delaySubscription
![delaySubscription](http://opgvsfix4.bkt.clouddn.com/rxjava_delaySubscription.png)
示例代码:
final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delaySubscription(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer aLong) throws Exception {
if (aLong == 1) {
Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
}
Log.e(TAG, "accept:" + aLong);
}
});
输出结果:
delay Time :2500
accept:1
accept:2
6.2 Do
注册一个动作作为原始 Observable 生命周期事件的一种占位符。
![Do](http://opgvsfix4.bkt.clouddn.com/txjava_do.c.png)
Do 操作符就是给 Observable 的生命周期的各个阶段加上一系列的回调监听,当 Observable 执行到这个阶段的时候,这些回调就会被触发。
在 Rxjava2.0 中实现了很多的 do 操作符的变体。
6.2.1 doAfterNext
实现方法:doAfterNext(Consumer)
从上流向下流发射后被调用。
示例代码:
public static void demo_doAfterNext(){
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
ob1.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG,"doAfterNext="+integer);
}
}).subscribe(getNormalObserver());
}
public static Disposable mDisposable ;
//可重复使用
public static Observer<Integer> getNormalObserver(){
return new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
mDisposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,"normal,onNext:"+integer);
}
@Override
public void onError(@NonNull Throwable error) {
Log.e(TAG,"normal,Error: " + error.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG,"normal,onComplete");
}
};
}
输出结果:
normal,onNext:1
doAfterNext : 1
normal,onNext:2
doAfterNext : 2
normal,onNext:3
doAfterNext : 3
normal,onComplete
6.2.2 doAfterTerminate
![doAfterTerminate](http://opgvsfix4.bkt.clouddn.com/rxjava_doAfterTerminate.png)
实现方法: doAfterTerminate(Action)
注册一个 Action,当 Observable 调用 onComplete 或 onError 触发。
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
// emitter.onError(new Throwable("nothingerro"));
}
});
ob1.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doAfterTerminate run");
}
}).subscribe(getNormalObserver());
输出结果:
normal,onNext:1
normal,onNext:2
normal,onComplete
doAfterTerminate run
6.2.3 doFinally
实现方法: doFinally(Action onDispose)
当 Observable 调用 onError 或 onCompleted 之后调用指定的操作,或由下游处理。
doFinally 优先于 doAfterTerminate 的调用。
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
// emitter.onError(new Throwable("nothingerro"));
}
});
ob1.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doFinally run");
}
}).subscribe(getNormalObserver());
输出结果:
normal,onNext:1
normal,onNext:2
normal,onComplete
doFinally run
6.2.4 doOnDispose
![doOnDispose](http://opgvsfix4.bkt.clouddn.com/doOnUnsubscribe.png)
实现方法:doOnDispose(Action onDispose)
当 Observable 取消订阅时,它就会被调用。
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
//mDisposable 参考6.2.1
if (mDisposable != null) {
mDisposable.dispose();
}
emitter.onNext(2);
emitter.onComplete();
// emitter.onError(new Throwable("nothingerro"));
}
});
ob1.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doOnDispose run");
}
}).subscribe(getNormalObserver());
输出结果:
normal,onNext:1
doOnDispose run
6.2.5 doOnComplete
![doOnComplete](http://opgvsfix4.bkt.clouddn.com/doOnComplete.png)
当它产生的 Observable 正常终止调用 onCompleted 时会被调用。
Javadoc: doOnCompleted(Action)
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothingerror"));
emitter.onComplete();
}
});
ob1.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete run");
}
}).subscribe(getNormalObserver());
输出结果:
normal,onNext:1
normal,onNext:2
doOnComplete run
normal,onComplete
6.2.6 doOnEach
![doOnEach](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnEach.png)
doOnEach 操作符让你可以注册一个回调,它产生的 Observable 每发射一项数据就会调用它一次。不仅包括 onNext 还包括 onError 和 onCompleted。
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothingerror"));
emitter.onComplete();
}
});
ob1.doOnEach(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "doOnEach,onNext:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "doOnEach,onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "doOnEach,onComplete");
}
}).subscribe(getNormalObserver());
输出结果:
doOnEach,onNext:1
normal,onNext:1
doOnEach,onNext:2
normal,onNext:2
doOnEach,onComplete
normal,onComplete
6.2.7 doOnError
doOnError 操作符注册一个动作,当它产生的 Observable 异常终止调用 onError 时会被调用。
![doOnError](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnError.png)
实现方法 doOnError(Consumer<? super Throwable>);
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("nothing error"));
emitter.onComplete();
}
});
ob1.doOnError(new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG,"doOnError : "+throwable.getMessage());
}
}).subscribe(getNormalObserver());
输出结果:
normal,onNext:1
normal,onNext:2
doOnError : nothing error
normal,Error: nothing error
6.2.8 doOnLifecycle
调用相应的 onXXX 方法(在所有 Observer 之间共享),用于序列的生命周期事件(订阅,取消,请求)。
![doOnLifecycle](http://opgvsfix4.bkt.clouddn.com/doOnLifecycle.png)
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothing error"));
if (mDisposable != null) {
mDisposable.dispose();
}
emitter.onComplete();
}
});
ob1.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnLifecycle ,disposable:" + disposable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnLifecycle ,run");
}
}).subscribe(getNormalObserver());
输出结果:
doOnLifecycle ,disposable:null
normal,onNext:1
normal,onNext:2
doOnLifecycle ,run
6.2.9 doOnNext
doOnNext操作符类似于 doOnEach(Consumer)。
![doOnNext](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnNext.png)
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothing error"));
emitter.onComplete();
}
});
ob1.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "doOnNext ,onNext:"+integer);
}
}).subscribe(getNormalObserver());
输出结果:
doOnNext ,onNext:1
normal,onNext:1
doOnNext ,onNext:2
normal,onNext:2
normal,onComplete
6.2.10 doOnSubscribe
doOnSubscribe,当观察者订阅它生成的 Observable 它就会被调用。
![doOnSubscribe](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnSubscribe.png)
实践:在 Observable 发射前做一些初始化操作(比如开始加载数据时显示载入中界面)。
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothing error"));
emitter.onComplete();
}
});
ob1.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe,disposable:" + disposable);
}
}).subscribe(getNormalObserver());
输出结果:
doOnSubscribe,disposable:null
normal,onNext:1
normal,onNext:2
normal,onComplete
6.2.11 doOnTerminate
doOnTerminate 操作符注册一个动作,当它产生的 Observable 终止之前会被调用,无论是正常还是异常终止。
![doOnTerminate](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnTerminate.png)
实现方法:doOnTerminate(Action)
实践:不管消息流最终以 onError() / onComplete() 结束,都会被调用(类似 Java 的 finally ),对于某些需要 onError() / onComplete() 后都要执行的操作(如网络加载成功/失败都要隐藏载入中界面),可以放在这里。
注意:取消订阅时,不会调用 doOnTerminate 方法。
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothing error"));
emitter.onComplete();
}
});
ob1.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doOnTerminate,run");
}
}).subscribe(getNormalObserver());
输出结果:
normal,onNext:1
normal,onNext:2
doOnTerminate,run
normal,onComplete
6.2.12 onTerminateDetach
当执行了反注册 unsubscribes 或者发送数据序列中断了,解除上游生产者对下游接受者的引用。
实践:onTerminateDetach 会使 Observable 调用 UnSubscriber 时,对 Subscriber 的引用会被释放,从而避免造成内存泄漏。
6.3 Meterialize / Dematerialize
6.3.1 Materialize
Materialize 将数据项和事件通知都当做数据项发射,Dematerialize 刚好相反。
![Meterialize](http://opgvsfix4.bkt.clouddn.com/materialize.c.png)
一个合法的有限的 Obversable 将调用它的观察者的 onNext 方法零次或多次,然后调用观察者的 onCompleted 或 onError 仅一次。Materialize 操作符将这一系列调用,包括原来的 onNext 通知和终止通知 onCompleted 或 onError 都转换为一个 Observable 发射的数据序列。
通俗一点的说法:Meterialize 操作符将 OnNext / OnError / OnComplet e都转化为一个 Notification 对象并按照原来的顺序发射出来。
示例代码:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("love world"));
emitter.onComplete();
}
});
ob1.materialize().subscribe(new Consumer<Notification<Integer>>() {
@Override
public void accept(@NonNull Notification<Integer> in) throws Exception {
if (in.isOnNext()) {
Log.e(TAG, "materialize,onNext: " + in.isOnNext());
return;
}
if (in.isOnError()) {
Log.e(TAG, "materialize,onError: "+in.getError().getMessage());
return;
}
if (in.isOnComplete()) {
Log.e(TAG, "materialize,OnComplete");
return;
}
}
});
输出结果:
materialize,onNext: true
materialize,onNext: true
materialize,OnComplete
6.3.2 Dematerialize
而 Dematerialize 执行相反的过程。
![Dematerialize](http://opgvsfix4.bkt.clouddn.com/dematerialize.c.png)
示例代码:
Observable<Notification<Integer>> ob1 = Observable.create(new ObservableOnSubscribe<Notification<Integer>>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Notification<Integer>> e) throws Exception {
e.onNext(Notification.createOnNext(1));
e.onNext(Notification.<Integer>createOnError(new Throwable("My error!")));
e.onNext(Notification.<Integer>createOnComplete());
}
});
ob1.dematerialize().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
Log.e(TAG, "onNext:" + o.toString());
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
输出结果:
onNext:1
onComplete
6.4 ObserveOn / SubscribeOn
指定一个观察者在哪个调度器(线程)上观察这个 Observable。
![ObserveOn](http://opgvsfix4.bkt.clouddn.com/observeOn.c.png)
![SubscribeOn](http://opgvsfix4.bkt.clouddn.com/subscribeOn.c.png)
ObserverOn 用来指定观察者所运行的线程,也就是发射出的数据在那个线程上使用。
在 Android 中,如果经常会遇见这样场景,我们需要从网络中读取数据,之后修改 UI 界面,观察者就必须在主线程上运行,就如同 AsyncTask 的 onPostExecute。
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
注意:当遇到一个异常时 ObserveOn 会立即向前传递这个 onError 终止通知,它不会等待慢速消费的 Observable 接受任何之前它已经收到但还没有发射的数据项。这可能意味着 onError 通知会跳到(并吞掉)原始 Observable 发射的数据项前面,正如下图所示的。
![ObserveOn](http://opgvsfix4.bkt.clouddn.com/observeOn.e.png)
示例代码:
/**
Schedulers.io() 代表 io 操作的线程, 通常用于网络,读写文件等 io 密集型的操作
Schedulers.computation() 代表 CPU 计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
*/
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
Log.e(TAG, "onNext:" +integer);
}
});
输出结果:
subscribeOn:RxCachedThreadScheduler-1
observerOn:RxNewThreadScheduler-1
onNext:1
6.4.1 unsubscribeOn
修改原 Observable,以便订阅者将其配置在指定的调度器(线程)上。
示例代码:
//将线程从 computation 换到 io 中
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "doOnNext,observerOn:" + Thread.currentThread().getName());
Log.e(TAG, "doOnNext,onNext:" + integer);
}
})
.unsubscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
Log.e(TAG, "onNext:" + integer);
}
});
输出结果:
subscribeOn:RxNewThreadScheduler-1
doOnNext,observerOn:RxComputationThreadPool-1
doOnNext,onNext:1
observerOn:RxComputationThreadPool-1
onNext:1
6.5 Serialize
强制一个 Observable 连续调用并保证行为正确。
![Serialize](http://opgvsfix4.bkt.clouddn.com/serialize.c.png)
一个 Observable 可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让 Observable 行为不正确,它可能会在某一个 onNext 调用之前尝试调用 onCompleted 或 onError 方法,或者从两个不同的线程同时调用 onNext 方法。使用 serialize 操作符,你可以纠正这个 Observable 的行为,保证它的行为是正确的且是同步的。
6.6 TimeInterval
将一个发射数据的 Observable 转换为发射那些数据发射时间间隔的 Observable。
![TimeInterval](http://opgvsfix4.bkt.clouddn.com/timeInterval.c.png)
TimeInterval 操作符拦截原始 Observable 发射的数据项,替换为两个连续发射物之间流逝的时间长度。 也就是说这个使用这个操作符后发射的不再是原始数据,而是原始数据发射的时间间隔。新的 Observable 的第一个发射物表示的是在观察者订阅原始 Observable 到原始 Observable 发射它的第一项数据之间流逝的时间长度。 不存在与原始 Observable 发射最后一项数据和发射 onCompleted 通知之间时长对应的发射物。timeInterval 默认在 immediate 调度器上执行,你可以通过传参数修改。
示例代码:
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.timeInterval()
.subscribe(new Consumer<Timed<Long>>() {
@Override
public void accept(@NonNull Timed<Long> t) throws Exception {
Log.e(TAG, "onNext: " + t.value() + " , time = " + t.time());
}
});
输出结果:
onNext: 0 , time = 104
onNext: 1 , time = 113
onNext: 2 , time = 100
6.7 Timeout
对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。
![Timeout](http://opgvsfix4.bkt.clouddn.com/timeout.c.png)
Timeout 操作符给 Observable 加上超时时间,每发射一个数据后就重置计时器,当超过预定的时间还没有发射下一个数据,就抛出一个超时的异常。
RxJava2.0 中的实现的 Timeout 操作符有好几个变体:
- timeout(long,TimeUnit): 第一个变体接受一个时长参数,每当原始 Observable 发射了一项数据,timeout就启动一个计时器,如果计时器超过了指定指定的时长而原始 Observable 没有发射另一项数据,timeout 就抛出 TimeoutException,以一个错误通知终止 Observable。 这个timeout默认在computation调度器上执行,你可以通过参数指定其它的调度器。
- timeout(long,TimeUnit,Observable): 这个版本的 timeout 在超时时会切换到使用一个你指定的备用的 Observable,而不是发错误通知。它也默认在 computation 调度器上执行。
- timeout(Function):这个版本的 timeout 使用一个函数针对原始 Observable 的每一项返回一个 Observable,如果当这个 Observable 终止时原始 Observable 还没有发射另一项数据,就会认为是超时了,timeout 就抛出 TimeoutException,以一个错误通知终止 Observable。
- timeout(Function,Observable): 这个版本的 timeout 同时指定超时时长和备用的 Observable。它默认在immediate调度器上执行
示例代码1:
/**
* 在 150 毫秒间隔内如果没有发射数据。发送一个 TimeoutException 通知终止。
* */
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 5; i++) {
Thread.sleep(i * 100);
emitter.onNext(i);
}
emitter.onComplete();
}
})
.timeout(150, TimeUnit.MILLISECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "onNext:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
输出结果:
onNext:0
onNext:1
onError:null
示例代码 2:
/**
* 只接收 200 毫秒间隔内发送的数据,如果超时则切换到 Observable.just(100, 200)
* */
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 5; i++) {
Thread.sleep(i * 100);
emitter.onNext(i);
}
emitter.onComplete();
}
})
.timeout(200, TimeUnit.MILLISECONDS, Observable.just(100, 200))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出结果:
accept:0
accept:1
accept:100
accept:200
6.8 Timestamp
给 Observable 发射的数据项附加一个时间戳。
![Timestamp](http://opgvsfix4.bkt.clouddn.com/timestamp.c.png)
它将一个发射 T 类型数据的 Observable 转换为一个发射类型为 Timestamped 的数据的 Observable,每一项都包含数据的发射时间。也就是把 Observable 发射的数据重新包装了一下,将数据发射的时间打包一起发射出去,这样观察者不仅能得到数据,还能得到数据的发射时间。 timestamp 默认在 immediate 调度器上执行,但是可以通过参数指定其它的调度器。
示例代码:
Observable.range(1, 3)
.timestamp()
.subscribe(new Consumer<Timed<Integer>>() {
@Override
public void accept(@NonNull Timed<Integer> t) throws Exception {
Log.e(TAG, "accept ,onNext:" + t.value() + ",time = " + t.time());
}
});
输出结果:
accept ,onNext:1,time = 1494606809418
accept ,onNext:2,time = 1494606809420
accept ,onNext:3,time = 1494606809420
6.9 Using
创建一个只在 Observable 生命周期内存在的一次性资源.
![Using](http://opgvsfix4.bkt.clouddn.com/using.c.png)
当一个观察者订阅 using 返回的 Observable 时,using 将会使用 Observable 工厂函数创建观察者要观察 Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个 Observable 时,或者当观察者终止时(无论是正常终止还是因错误而终止),using 使用第三个函数释放它创建的资源。
using 操作符接受三个参数:
- 一个用户创建一次性资源的工厂函数
- 一个用于创建 Observable 的工厂函数
- 一个用于释放资源的函数
示例代码:
Observable.using(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return new Random().nextInt(10);
}
}, new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
return Observable.just("hello+" + integer, "world+" + integer);
}
}, new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "using,accept - >" + integer);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "subscribe,accept -> " + s);
}
});
输出结果:
subscribe,accept -> hello+8
subscribe,accept -> world+8
using,accept - >8
6.10 To
将 Observable 转换为另一个对象或数据结构。
![To](http://opgvsfix4.bkt.clouddn.com/to.c.png)
ReactiveX 的很多语言特定实现都有一种操作符让你可以将 Observable 或者 Observable 发射的数据序列转换为另一个对象或数据结构。它们中的一些会阻塞直到 Observable 终止,然后生成一个等价的对象或数据结构;另一些返回一个发射那个对象或数据结构的 Observable。
在某些 ReactiveX 实现中,还有一个操作符用于将 Observable 转换成阻塞式的。一个阻塞式的 Ogbservable 在普通的 Observable 的基础上增加了几个方法,用于操作 Observable 发射的数据项。
RxJava2.x 中实现了多种 To 操作符:
6.10.1 To
示例代码:
输出结果:
6.10.2 toFuture
返回表示该 Observable 发出的单个值的 Future。
如果 Observable 发出多个项目,Future 将会收到一个 IllegalArgumentException。 如果 Observable 为空,Future 将收到一个 NoSuchElementException。
如果 Observable 可能会发出多个项目,请使用Observable.toList() 、toBlocking() 、toFuture()。
![toFuture](http://opgvsfix4.bkt.clouddn.com/B.toFuture.png)