上一篇:RxJava 源码学习之过滤操作符
今天我们继续学习RxJava的组合操作符。
StartWith
-
作用分析
StartWith操作符可以在发射一个Observable的数据之前先发射一个指定的数据序列。
操作符
-
示例代码
可接受一个Iterable或者多个Observable作为函数的参数。
Javadoc: startWith(Iterable)
Javadoc: startWith(T) (最多接受九个参数)
//测试代码
Integer[] nums = {1,2,3,4};
Observable.from(nums)
.startWith(9,8)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {System.out.println("Next: " + item); }
@Override
public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
});
//###########################################
输出结果:
Next: 9
Next: 8
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
Merge
-
作用分析
merge可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。merge可能会让合并的Observables发射的数据交错(有一个类似的操作符 Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。
注意:如果传递给merge的任何一个的Observable发射了onError通知终止了,merge操作符生成的Observable也会立即以onError通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用mergeDelayError。
-
示例代码
Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler); // subscribeOn 切换线程
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
//###########################################
输出结果:
Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.
ZIP
-
作用分析
Zip操作符返回一个Obversable,它使用特定函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
下图:把 shape(形状)、size(尺寸) 和 color(颜色) 合并后,发射出来。
-
示例代码
Observable.zip(
Observable.just("a1","a2","a3","a4","a5"),
Observable.just(1,2,3,4),
Observable.just("b1","b2","b3","b4","b5","b6"),
new Func3<String,Integer,String,String>(){
@Override
public String call(String s, Integer integer, String s2) {
return s+"_"+integer+"_"+s2;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() { System.out.println("onCompleted."); }
@Override
public void onError(Throwable e) { System.out.println("onError: " + e.getMessage()); }
@Override
public void onNext(String s) { System.out.println("onNext: " + s); }});
//############################################
输出结果:
onNext: a1_1_b1
onNext: a2_2_b2
onNext: a3_3_b3
onNext: a4_4_b4
onCompleted.
Join
-
作用分析
任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
比如: ObservableA 在 5s内发射一条数据 dataA1, ObservableB 这时刚好也在发射数据dataB1,就把ObservableA 的数据dataA1和 ObservableB的数据dataB1合并一起发射;5s还没结束,ObservableB又发射数据dataB2,就把ObservableA 的数据dataA1和 ObservableB的数据dataB2合并一起发射。
-
示例代码
Javadoc: Join(Observable,Func1,Func1,Func2)
- 第二个Observable和源Observable结合。
- Func1参数:在指定的由时间窗口定义时间间隔内,源Observable发射的数据和从第二个Observable发射的数据相互配合返回的Observable。
- Func1参数:在指定的由时间窗口定义时间间隔内,第二个Observable发射的数据和从源Observable发射的数据相互配合返回的Observable。
- Func2参数:定义已发射的数据如何与新发射的数据项相结合。
Observable<Integer> create1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 6; i++) {
subscriber.onNext(i);
try {
Thread.sleep(600);
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}}).subscribeOn(Schedulers.newThread());
Observable<String> create2 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 4; i++) {
subscriber.onNext("hello_"+ i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}}).subscribeOn(Schedulers.newThread());
create1.join(create2,
new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
},
new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
},
new Func2<Integer, String, String>() {
@Override
public String call(Integer integer1, String s) {
return integer1 + "-" + s;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() { System.out.println("onCompleted."); }
@Override
public void onError(Throwable e) { System.out.println("onError: " + e.getMessage()); }
@Override
public void onNext(String s) { System.out.println("onNext: " + s); }});
//##############################################
输出结果:
onNext: 0-hello_0
onNext: 1-hello_0
onNext: 1-hello_1
onNext: 2-hello_1
onNext: 3-hello_1
onNext: 3-hello_2
onNext: 2-hello_2
onNext: 4-hello_2
onNext: 4-hello_3
onNext: 5-hello_3
Switch
有这样一个复杂的场景就是在一个subscribe-unsubscribe的序列里我们能够从一个Observable自动取消订阅来订阅一个新的Observable。
RxJava的switch(),正如定义的,将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。
给出一个发射多个Observables序列的源Observable,switch()订阅到源Observable然后开始发射由第一个发射的Observable发射的一样的数据。当源Observable发射一个新的Observable时,switch()立即取消订阅前一个发射数据的Observable(因此打断了从它那里发射的数据流)然后订阅一个新的Observable,并开始发射它的数据。
结束语
ok,RxJava之组合操作符已经学习完啦,当然这里都是分析一些常用的,想了解更多的操作符就去看RxJava官方文档吧。
下一篇:RxJava 源码学习之调度器Scheduler。