今天,来学习RxJava中的组合/合并操作符,并完成实例。
一、作用
组合多个被观察者&合并需要发送的事件
二、类型
常见的组合/合并操作符有:
1.组合多个被观察者
a.按发送顺序:concat()、concatArray()
b.按时间:merge()、mergeArray()
c.错误处理:concatDelayError()、mergeDelayError()
2.合并多个事件
a.按数量:Zip()
b.按时间:combineLatest()、combineLatestDelayError()
c.合并成1个事件发送:reduce()、collect()
3.发送事件前追加发送事件:startWith()、startWithArray()
4.统计发送事件数量:count()
三、操作符及应用介绍
首先在项目中添加依赖:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
1.组合多个被观察者
concat()/concatArray()
作用:组合多个被观察者发送数据,合并后按发送顺序串行执行
二者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()可以大于4个
具体使用:
//concat()
:组合多个被观察者
(≤4
个
)
一起发送数据
//
注:串行执行
Observable
.
concat
(
Observable
.
just
(
1
,
2
,
3
),
Observable
.
just
(
4
,
5
,
6
),
Observable
.
just
(
7
,
8
,
9
),
Observable
.
just
(
10
,
11
,
12
))
.subscribe(
new
Observer<Integer>() {
@Override
public void
onSubscribe(Disposable d) {
}
@Override
public void
onNext(Integer integer) {
Log.
e
(
tag
,
"
接收到了事件
"
+integer);
}
@Override
public void
onError(Throwable e) {
Log.
e
(
tag
,
""
);
}
@Override
public void
onComplete() {
Log.
e
(
tag
,
""
);
}
});
//concat()
:组合多个被观察者
(≤4
个
)
一起发送数据
//
注:串行执行
Observable
.
concatArray
(
Observable
.
just
(
1
,
2
,
3
),
Observable
.
just
(
4
,
5
,
6
),
Observable
.
just
(
7
,
8
,
9
),
Observable
.
just
(
10
,
11
,
12
),
Observable.just(13,14,15))
.subscribe(
new
Observer<Integer>() {
@Override
public void
onSubscribe(Disposable d) {
}
@Override
public void
onNext(Integer integer) {
Log.
e
(
tag
,
"
接收到了事件
"
+integer);
}
@Override
public void
onError(Throwable e) {
Log.
e
(
tag
,
""
);
}
@Override
public void
onComplete() {
Log.
e
(
tag
,
""
);
}
});
merge()/mergeArray
作用:
组合多个被观察者一起发送数据,合并后按时间线并行执行
二者区别:组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个
-
区别上述concat()操作符:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行
具体使用:
//merge()组合多个被观察者(<4个)一起发送数据
// 注:合并后按照时间线并行执行
Observable.merge(Observable.intervalRange(1,2,1,1, TimeUnit.SECONDS),
Observable.intervalRange(1,3,10,1,TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.e(tag,"接收到了事件"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e(tag,"");
}
@Override
public void onComplete() {
Log.e(tag,"complete");
}
});
//mergeArray()组合多个被观察者(>4个)一起发送数据
// 注:合并后按照时间线并行执行
Observable.mergeArray(Observable.intervalRange(1,2,1,1, TimeUnit.SECONDS),
Observable.intervalRange(1,3,10,1,TimeUnit.SECONDS),
Observable.intervalRange(1,2,1,1, TimeUnit.SECONDS),
Observable.intervalRange(1,3,10,1,TimeUnit.SECONDS),
Observable.intervalRange(1,2,1,1, TimeUnit.SECONDS),
Observable.intervalRange(1,3,10,1,TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.e(tag,"接收到了事件"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e(tag,"");
}
@Override
public void onComplete() {
Log.e(tag,"complete");
}
});
concatDelayError()/mergeDelayError()
作用:
[图片上传失败...(image-4c0cf4-1512547073721)]
具体使用:
Observable
.
concatArrayDelayError
(
Observable
.
create
(
new
ObservableOnSubscribe<Integer>() {
@Override
public void
subscribe(ObservableEmitter<Integer> emitter)
throws
Exception {
emitter.onNext(
1
);
emitter.onNext(
2
);
emitter.onNext(
3
);
emitter.onError(
new
NullPointerException());
//
发送
Error
事件,因为使用了
concatDelayError
,所以第
2
个
Observable
将会发送事件,等发送完毕后,再发送错误事件
emitter.onComplete();
}
}),
Observable
.
just
(
4
,
5
,
6
))
.subscribe(
new
Observer<Integer>() {
@Override
public void
onSubscribe(Disposable d) {
}
@Override
public void
onNext(Integer value) {
Log.
d
(
tag
,
"
接收到了事件
"
-
value );
}
@Override
public void
onError(Throwable e) {
Log.
d
(
tag
,
"
对
Error
事件作出响应
"
);
}
@Override
public void
onComplete() {
Log.
d
(
tag
,
"
对
Complete
事件作出响应
"
);
}
});
2.合并多个事件
该类型操作符主要是对多个被观察者中的事件进行合并处理。
Zip()
作用:合并多个被观察者发送的事件,生成一个新的事件序列,最终发送。
原理:
[图片上传失败...(image-55bcd1-1512547073721)]
注意:事件组合方式=严格按照原先的事件序列进行对位合并
最终合并的事件数量=
多个被观察者中数量最少的数量
combineLatest()
作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。
与Zip()的区别:
zip()=按个数合并,即1对1合并;CombineLatest()=按时间合并,即在同一个时间点上合并
combineLatestDelayError()
作用类似于
concatDelayError()
/
mergeDelayError()
,即错误处理,此处不作过多描述
reduce()
作用:把被观察者要发送的事件聚合成1个事件&发送
collect()
作用:将被观察者Observable发送的数据事件收集到一个数据结构里
startWith()/startWithArray()
作用:在一个被观察者发送事件前,追加发送一些数据/一个新的被观察者。
Observable.
just
(
4
,
5
,
6
)
.startWith(
0
)
//
追加单个数据
= startWith()
.
startWithArray
(
1
,
2
,
3
)
//
追加多个数据
= startWithArray()
.subscribe(
new
Observer<Integer>() {
@Override
public void
onSubscribe(Disposable d) {
}
@Override
public void
onNext(Integer value) {
Log.
d
(
tag
,
"
接收到了事件
"
-
value );
} @Override public void
onError(Throwable e) {
Log.
d
(
tag
,
"
对
Error
事件作出响应
"
);
}
@Override
public void
onComplete() {
Log.
d
(
tag
,
"
对
Complete
事件作出响应
"
);
}
});
count()统计发送事件数量