1. RxJava2 : 什么是观察者模式
2. RxJava2 : 创建操作符(无关时间)
3. Rxjava2 : 创建操作符(有关时间)
4. Rxjava2 : 变换操作符
5. Rxjava2 : 判断操作符
6. Rxjava2 : 筛选操作符
7. Rxjava2 : 合并操作符
8. Rxjava2 : do操作符
9. Rxjava2 : error处理
10. Rxjava2 : 重试
11. Rxjava2 : 线程切换
api | use |
---|---|
map | {{map}} |
flatmap / concatMap | {{flatmap}} |
buffer | {{buffer}} |
map
- 操作的是每一个元素,所有元素均会发生改变
- 但元素的个数不会改变
- 是否会发送onComplete,取决于创建方式是否会发送
Observable.just(1, 2, 3)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer + 10;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 15:39:20.253 31399-31399/... D/SplashActivity: integer:11
02-12 15:39:20.253 31399-31399/... D/SplashActivity: integer:12
02-12 15:39:20.253 31399-31399/... D/SplashActivity: integer:13
02-12 15:39:20.253 31399-31399/... D/SplashActivity: onComplete
flatmap / concatMap
flatmap
1.与map类似,操作的同样是每一个元素
2.区别是,可以将元素转换成Oberservable,然后再继续发送,也就是说数量可变
3.元素是无序的
4.是否会发送onComplete,取决于创建方式是否会发送concatMap
均于flatmap相同,只是元素是有序的
流程
Observable.just(1,2,3)
->
元素 1,元素 2, 元素 3
他们全部来自于一个Observable
->
元素 1 -> Observable (元素:1 子元素0, 元素:1 子元素1, 元素:1 子元素2)
...
->
现在一共有3*3,9个元素
->
subscribe
->
Observer
这9个元素可能会是无序的
Disposable disposable = Observable
.just(1, 2, 3)
.flatMap((Function<Integer, ObservableSource<String>>) integer -> {
List<String> strings = new ArrayList<>();
strings.add("event: " + integer + " list: 0");
strings.add("event: " + integer + " list: 1");
strings.add("event: " + integer + " list: 2");
return Observable.fromIterable(strings);
}).subscribe(
s -> {
Log.d(TAG, s);
});
log
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 1 list: 0
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 1 list: 1
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 1 list: 2
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 2 list: 0
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 3 list: 1
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 2 list: 2
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 2 list: 1
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 3 list: 0
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 3 list: 2
buffer
- 基于数量
1.参数1: 每回取的数量,参数2: 跳过的数量
2.如果未指定跳过的数量,则取了几个就跳过几个
3.如果指定了跳过的数量,则按照指定跳过,并且list.size() = 发送数量/skip+1
4.是否会发送onComplete,取决于创建方式是否会发送
//不采用跳过
Observable.just(1,2,3,4,5)
.buffer(3)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> integers) {
Log.d(TAG, "integers:" + integers);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 16:08:07.723 32515-32515/... D/SplashActivity: integers:[1, 2, 3]
02-12 16:08:07.723 32515-32515/... D/SplashActivity: integers:[4, 5]
02-12 16:08:07.723 32515-32515/... D/SplashActivity: onComplete
//采用跳过,即可看到,最后一个集合只包含一个[5]
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> integers) {
Log.d(TAG, "integers:" + integers);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 16:12:56.273 531-531/... D/SplashActivity: integers:[1, 2, 3]
02-12 16:12:56.273 531-531/... D/SplashActivity: integers:[3, 4, 5]
02-12 16:12:56.273 531-531/... D/SplashActivity: integers:[5]
02-12 16:12:56.273 531-531/... D/SplashActivity: onComplete
- 基于时间
1.发送的元素,并不会直接用于Observer,而是会暂存在一个缓存区当中,以指定的时间去取
2.可能会存在取不到值的情况,缓存区当中没有值
3.最后一次取值时间会于之前不同
4.是否会发送onComplete,取决于创建方式是否会发送
流程
元素 | 时间 | 所取值 |
---|---|---|
in | 0 | 没取 |
0 | 1 | 没取 |
1 | 2 | 第一次取值,1还没有进入缓存区,只取到0 |
2 | 3 | 没取 |
3 | 4 | 第二次取值,现在缓存区中存在1,2 |
4 | 5 | 因为这是最后两个值了,所以会提前取,全部取出 |
Log.d(TAG, "in");
Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.buffer(2, TimeUnit.SECONDS)
.subscribe(new Observer<List<Long>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Long> longs) {
Log.d(TAG, "longs:" + longs);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 16:19:12.293 1345-1345/... D/SplashActivity: in
02-12 16:19:14.303 1345-1401/... D/SplashActivity: longs:[0]
02-12 16:19:16.303 1345-1401/... D/SplashActivity: longs:[1, 2]
02-12 16:19:17.303 1345-1402/... D/SplashActivity: longs:[3, 4]
02-12 16:19:17.303 1345-1402/... D/SplashActivity: onComplete
- 数量和时间同时作用
流程
元素 | 时间 | 所取值 |
---|---|---|
in | 0 | 没取 |
0 | 1 | 没取 |
1 | 2 | 第一次取值,满足的是数量,取0,1 |
2 | 3 | 第二次取值,满足的是时间,这个时候,缓存区中的元素已经通过数量取走了,因此什么也没有取到 |
3 | 4 | 没取 |
4 | 5 | 第三次取值,满足的是数量,取2,3 |
5 | 6 | 第四次取值,满足的是时间,这个时候,缓存区中的元素只剩下一个4 |
6 | 7 | 第五次取值,满足的是数量,取5,6 同时进行第六次取值,因为结束,触发时间取值,但什么也没有取到 |
Log.d(TAG, "in");
Observable.interval(1, TimeUnit.SECONDS)
.take(7)
.buffer(3, TimeUnit.SECONDS,2)
.subscribe(new Observer<List<Long>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Long> longs) {
Log.d(TAG, "longs:" + longs);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 16:31:47.043 2152-2152/... D/SplashActivity: in
02-12 16:31:49.063 2152-2225/... D/SplashActivity: longs:[0, 1]
02-12 16:31:50.053 2152-2224/... D/SplashActivity: longs:[]
02-12 16:31:51.053 2152-2225/... D/SplashActivity: longs:[2, 3]
02-12 16:31:53.053 2152-2224/... D/SplashActivity: longs:[4]
02-12 16:31:54.053 2152-2225/... D/SplashActivity: longs:[5, 6]
02-12 16:31:54.053 2152-2225/... D/SplashActivity: longs:[]
02-12 16:31:54.053 2152-2225/... D/SplashActivity: onComplete