此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著
3.8 聚合操作符
3.8.1 concat
concat操作符将多个Observable结合成一个Observable并发送数据,并且严格按照先后顺序发送数据,即前一个Observable数据没有发送完时,后面的Observable是不能发送数据的。
有两个操作符与concat操作符很类似,他们分别是:
-startWith:仅仅是在前面插上一个Observable或者一些数据,并且先发送插入的内容
-merge:其发送的数据是无序的,也就是说被组合的多个Observable是可以自由发送数据的,而不用管其它的Observable的状态。
/**
* concat
*/
private void concatTest(){
Observable<Integer> observable1 = Observable.just(1,2,3);
Observable<Integer> observable2 = Observable.just(4,5,6);
Observable<Integer> observable3 = Observable.just(7,8,9);
Observable.concat(observable1, observable3, observable2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("concat: " + integer);
}
});
}
结果:
concat: 1
concat: 2
concat: 3
concat: 7
concat: 8
concat: 9
concat: 4
concat: 5
concat: 6
3.8.2 count
count操作符用来统计源Observable发送了多少个数据,最后将数目发送出来。如果源Observable发送错误,则会将错误直接报出来。在源Observable停止发送前,count是不会发送统计数据的
/**
* count
*/
private void countTest(){
Observable.just(1,2,3).count()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("count: " + integer);
}
});
}
结果:
count: 3
3.8.3 reduce
reduce操作符应用一个函数接收Observable发送的数据和函数的计算结果,作为下次计算的参数,并输出最后的结果。reduce与我们前面了解过的scan操作符很类似,只是scan会输出每次计算的结果,而reduce只输出最后的结果。
/**
* reduce
*/
private void reduceTest(){
ArrayList<Integer> list = new ArrayList<>();
for(int i=0; i<10;i++){
list.add(2);
}
Observable.from(list).reduce(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer * integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("reduce: " + integer);
}
});
}
结果:
reduce: 1024
3.8.4 collect
collect操作符类似于reduce,但是二者目的不同。collect操作符用来将源Observable发送的数据收集到一个数据结构里面,最后将这个数据结构整个发送出来。collect操作符需要使用两个函数作为参数:
-第一个函数会产生收集数据结构的函数
-第二个函数会将上面函数产生的数据结构和源Observable发送的数据作为参数,且这个函数会将源Observable发送的数据存入到这个数据结构中。
/**
* collect
*/
private void collectTest(){
ArrayList<Integer> list = new ArrayList<>();
for(int i=0; i<10;i++){
list.add(i);
}
Observable.from(list).collect(new Func0<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() {
return new ArrayList<>();
}
}, new Action2<ArrayList<Integer>, Integer>() {
@Override
public void call(ArrayList<Integer> integers, Integer integer) {
integers.add(integer);
}
}).subscribe(new Action1<ArrayList<Integer>>() {
@Override
public void call(ArrayList<Integer> integers) {
log("collect: " + integers);
}
});
}
结果:
collect: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]