前言
首先要感谢 Season_zlc 的一系列RxJava2
的教程,关于上游、下游、水缸的类比,让我对于整个RxJava2
的基本思想有了更加清晰的认识。大家有兴趣的话一定要多看看,写的通俗易懂,传送门:给初学者的 RxJava 2.0 教程 (一) ,本文的思想都来源于它的一系列文章。
文章比较长,为了避免耽误大家的时间,先列出需要介绍的知识点:
一、RxJava2 的基本模型
1.1 使用实例
在开始学习之前,我们先看一下最简单的例子:
- 第一步:导入依赖包:
dependencies {
//在build.gradle中,导入依赖。
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
- 第二步:使用最基本的
Observable
+Observer
的最简单示例,这里我们在上游发送了四个onNext(String s)
事件之后,最后发送了一个onComplete()
事件。
public static void classicalSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("1");
observableEmitter.onNext("2");
observableEmitter.onNext("3");
observableEmitter.onNext("4");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "onSubscribe");
mDisposable = disposable;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext=" + s);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
- 第三步:运行结果,订阅成功之后,会依次回调以下三步操作:
onSubscribe
;onNext
;onComplete
。
1.2 基本元素
在上面的例子中,涉及到了以下五个类:
-
Observable
:上游。 -
ObservableOnSubscribe
:上游的create
方法所接收的参数。 -
ObservableEmitter
:上游事件的发送者。 -
Observer
:下游的接收者。 -
Disposable
:用于维系上游、下游之间的联系。
对于整个模型,可以总结为以下几点:
-
RxJava2
简单的来说,就是一个发送事件、接收事件的过程,我们可以将发送事件方类比作上游,而接收事件方类比作下游。 - 上游每产生一个事件,下游就能收到事件,上游对应
Observable
,而下游对应Observer
。 - 只有当上游和下游建立连接之后,上游才会开始发送事件,这一关系的建立是通过
subscribe
方法。
各关键元素的UML
图如下:
1.3 ObservableEmitter
用于 发出事件,它可以分别发出onNext/onComplete/onError
事件:
- 上游可以发送无限个
onNext
,下游也可以接收无限个onNext
。 - 当上游发送了一个
onComplete/onError
后,上游onComplete/onError
后的事件将会继续发送,但是下游在收到onComplete/onError
事件后不再继续接收事件。 - 上游可以不发送
onComplete
或者onError
事件。 - 调用
onError
或者onComplete
切断了上游和下游的联系,在联系切断后上游再发送onError
事件就会报错,onComplete
和onError
的调用情况有以下几种:
(1)onComplete
可以发送多次,但是只会收到一次回调。
(2)onError
只可以发送一次,发送多次会报错。
(3)onComplete
之后不可以发送onError
,否则会报错。
(4)onError
之后可以发送onComplete
,但是只会收到onError
事件。 -
onError
的参数不允许为空。
其继承关系如下图所示:
1.4 Disposable
理解成为 水管的机关,当调用它的dispose
方法时,将会将上游和下游之间的管道切断,从而导致 下游接收不到事件。
- 在
Observer
的onSubscribe
回调中,会传入一个Disposable
对象,下游可以通过该对象的dispose()
方法主动切断和上游的联系,在这之后上游的observableEmitter.isDisposed()
方法将返回true
。 - 当上游和下游的联系切断之后,下游收不到包括
onComplete/onError
在内的任何事件,若此时上游再调用onError
方法发送事件,那么将会报错。
我们来模拟一下,在下游收到2
之后,通过Disposable
来切断上游和下游之间的联系:
public static void classicalSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("1");
observableEmitter.onNext("2");
observableEmitter.onNext("3");
observableEmitter.onNext("4");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "onSubscribe");
mDisposable = disposable;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext=" + s);
if ("2".equals(s)) {
mDisposable.dispose();
}
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
最终的运行结果为:
1.5 Subscribe 的重载方法
通过subscribe
确定上游和下游的联系有以下几种方法:
可以看到,这里可以分为三类:
- 不带参数
-
Consumer<T>
类 -
Observer
类 -
Action
类
对于不使用Observer
类作为形参的subscribe
函数,其实实现的功能和使用Observer
类作为参数的方法相同,只不过它们是将Observer
的四个回调分解成形参,有参数的回调用Consumer<T>
代替,而没有参数的则用Action
代替。
二、线程切换
2.1 基本概念
- 当我们在上游创建一个
Observable
来发送事件,那么这个上游就默认在主线程发送事件;而当我们在下游创建一个Observer
来接收事件,那么这个下游就默认在主线程中接收事件。 -
subscribeOn
指定的是 上游发送事件 的线程,而observeOn
指定的是 下游接收事件 的线程。 - 多次调用
subscribeOn
只有第一次有效,而每调用一次observeOn
,那么下游接收消息的线程就会切换一次。 -
CompositeDisposable
可以用来容纳Disposable
对象,每当我们得到一个Disposable
对象时,就通过add
方法将它添加进入容器,在退出的时候,调用clear
方法,即可切断所有的水管。
2.2 线程类型
-
Schedulers.io()
:代表IO
操作,通常用于网络请求、文件读写等IO
密集型的操作。 -
Schedulers.computation()
:代表CPU
密集型的操作,适用于大量计算。 -
Schedulers.newThread()
:创建新的常规线程。 -
AndroidSchedulers.mainThread()
:代表Android
的主线程。
2.3 示例
在链式调用当中,我们可以通过observeOn
方法多次切换管道下游处理消息的线程,例如下面的代码,我们对下游进行了两次线程的切换:
static void mapSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=true");
observableEmitter.onNext("true");
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=false");
observableEmitter.onNext("false");
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",onComplete");
observableEmitter.onComplete();
}
//1.指定了subscribe方法执行的线程,并进行第一次下游线程的切换,将其切换到新的子线程。
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(new Function<String, Boolean>() {
@Override
public Boolean apply(String s) throws Exception {
Log.d(TAG, "apply's thread=" + Thread.currentThread().getId() + ",s=" + s);
return "true".equals(s);
}
//2.进行第二次下游线程的切换,将其切换到主线程。
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Boolean aBoolean) {
Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",boolean=" + aBoolean);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",onComplete");
}
});
}
以上代码的运行的结果为:
三、Map 和 FlatMap 操作符
3.1 Map
-
Map
操作符的作用是对上游发送的每一个事件应用一个函数,使得每个事件按照函数的逻辑进行变换,通过Map
就可以把上游发送的每一个事件,转换成Object
或者集合,其英文注释为:
- 以下面使用
map
的代码为例,可以看到map
接收一个Function
类,它有两个泛型变量,分别为调用map
方法的Observable<T>
的<T>
泛型,和返回的Obervable<R>
的<R>
泛型。
public static void mapVerify() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
});
Observable<String> convertObservable = sourceObservable.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
Log.d(TAG, "sourceObservable=" + sourceObservable + "\n convertObservable=" + convertObservable);
}
Function
为一个接口:
并且在
map
函数调用完毕之后,将返回一个新的Observable
,它的类型为ObservableMap
:3.2 FlatMap
-
FlatMap
用于将一个发送事件的上游Observable
变换成多个发送事件的Observable
,然后将它们发送的事件合并,放进一个单独的Observable
中,其注释为:
- 上游每发送一个事件,就会针对该事件创建一个单独的水管,然后发送转换后的新的事件,下游接收到的就是这些新的水管发送的事件。
-
FlatMap
不保证不同水管之间事件的顺序,如果需要保证顺序,则需要使用contactMap
。
3.2.1 示例
static void flatMapSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromArray("a value of " + integer + ",b value of " + integer);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
和map
操作符类似,它也接收一个类型为Function
的接口,只不过它的? extends R
参数类型换成了? extends Observable<? extends R>
。
3.2.2 FlatMap 不保证下游接收事件的顺序
前面我们说到,flatMap
操作符不会保证下游接收事件的顺序,下面,我们就以一个例子来说明,在flatMap
的apply
函数中,我们将一个事件转换成两个Observable
,并且加上了延时:
static void flatMapOrderSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "flatMapOrderSample emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "flatMapOrderSample emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "flatMapOrderSample emit 3");
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
Log.d(TAG, "flatMapOrderSample apply=" + integer);
long delay = (3 - integer) * 100;
return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
可以看到,最终的输出结果和flatMap
收到事件的顺序并不相同:
下面,还是同样的场景,将
flatMap
换成contactMap
:
static void contactMapOrderSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(2);
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
Log.d(TAG, "contactMapOrderSample apply=" + integer);
long delay = (3 - integer) * 100;
return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
最终的运行结果为:
四、Zip 操作符
4.1 基本概念
-
Zip
通过一个函数从多个Observable
每次各取出一个事件,合并成一个新的事件发送给下游。 - 组合的顺序是严格按照事件发送的顺序来的。
- 最终下游收到的事件数量和上游中发送事件最少的那一根水管的事件数量相同。
4.1.1 两个 Observable 运行在同一线程当中
static void zipSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "sourceObservable emit 1");
observableEmitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "sourceObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "sourceObservable emit 3");
observableEmitter.onNext(3);
Log.d(TAG, "sourceObservable emit 4");
observableEmitter.onNext(4);
}
});
Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "otherObservable emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "otherObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "otherObservable emit 3");
observableEmitter.onNext(3);
}
});
Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "resultObservable onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "resultObservable onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "resultObservable onError");
}
@Override
public void onComplete() {
Log.d(TAG, "resultObservable onComplete");
}
});
}
此时的运行结果为:
4.1.2 两个 Observable 运行在不同的线程
static void zipSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "sourceObservable emit 1");
observableEmitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "sourceObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "sourceObservable emit 3");
observableEmitter.onNext(3);
Log.d(TAG, "sourceObservable emit 4");
observableEmitter.onNext(4);
}
});
Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "otherObservable emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "otherObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "otherObservable emit 3");
observableEmitter.onNext(3);
}
}).subscribeOn(Schedulers.io());
Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "resultObservable onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "resultObservable onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "resultObservable onError");
}
@Override
public void onComplete() {
Log.d(TAG, "resultObservable onComplete");
}
});
}
运行结果为:
五、背压
“背压”其实就是一种用于解决问题的工具,那么我们的问题又是什么呢?
- 问题:当上游发送事件的速度很快,下游消费事件的速度又很慢,而系统又必须缓存这些上游发送的消息以便下游处理,那么就会导致系统中堆积了很多的资源。
- 工具:下游告知上游目前自己的处理能力,上游根据下游的处理能力,进行适当的调整。
想必大家在很多文章中都听过这个一句话:在RxJava2
中,Observable
不支持“背压”,而Flowable
支持背压。
5.1 不支持背压的 Observable
关于Observable
不支持背压,我们应当从两种情况去考虑,即上游、下游是否位于相同的线程。
5.1.1 Observable 之上游、下游位于相同线程
首先,我们不调用observeOn
和subscribeOn
方法来改变上游、下游的工作线程,这样,上游和下游就位于同一线程,同时,我们在下游的处理函数中,每收到一个消息就休眠2000ms
,以模拟上游处理速度大于下游的场景。
static void oomSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "observableEmitter=" + i);
observableEmitter.onNext(i);
}
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "accept=" + integer);
}
});
}
从下面的打印结果可以看到,当“使用 Observable
,并且上游、下游位于相同线程”时,并不会出现消息堆积的情况,因为上游发射完一条消息后,必须要等到下游处理完该消息,才会发射一条新的消息。
5.1.2 Observable 之上游、下游位于不同线程
接着,我们采用subscribeOn
和observeOn
来使得上游和下游位于不同的工作线程,其它均和2.2
中相同。
static void oomSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "observableEmitter=" + i);
observableEmitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "accept=" + integer);
}
});
}
和2.2
中不同,当上游和下游位于不同的工作线程,那么上游发送消息时,不会考虑下游是否已经处理了之前的消息,它会直接发送,而这些发送的消息被存放在水缸当中,下游每处理完一条消息,就去水缸中取下一条数据,那么随着水缸中数据越来越多,那么系统中的无用资源就会急剧增加。
5.1.3 关于 Observable 不支持背压的小结
我们之所以说Observable
不支持“背压”,就是在2.1
介绍的整个族谱中,没有一个类,一种方法能让下游通知上游说:不要再发消息到水缸里了,我已经处理不过来了!
那是不是说Flowable
支持“背压”,而Observable
不支持,那么Observable
就要被取代了呢,其实不然,Flowable
对于“背压”的支持是以性能为代价的,我们应当只在有可能出现2.3
中上游下游速率不匹配的问题时,才去使用Flowable
,否则就应当使用Observable
,也就是满足两点条件:
- 上游和下游位于不同的工作线程
- 上游发送消息的速度,要远远大于下游处理消息的速度,有可能造成消息的堆积。
5.2 支持背压的 Flowable
5.2.1 基本概念
-
Flowable
和Subscriber
分别对应于之前讨论的Observable
和Observer
,它们直接的连接仍然是通过subscribe
方法。 -
Flowable
在设计的时候采用了 响应式拉取 的思想,当下游调用了Subscription
的request
方法时,就表明了下游处理事件的能力,这样上游就可以根据这个值来控制事件发送的频率,避免出现前面谈到的上游发送太快,而下游处理太慢从而导致OOM
的发生。 - 只有上游根据下游的处理能力来发送事件,才能达到理想的效果。
5.2.2 基本使用
static void flowSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
sourceFlow.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
其类结构图和Observable
几乎完全一致:
5.3 Flowable 支持背压的策略
从上面的类图可以看出,Flowable
和Observable
最大的不同,就是在create
方法中,需要传入额外的参数,它表示的是“背压”的策略,这里可选的值包括:
ERROR
BUFFER
DROP
LATEST
5.3.1 使用 ERROR 的策略
- 当上游和下游位于同一个线程时,如果上游发送的事件超过了下游声明的
request(n)
的值,那么会抛出MissingBackpressureException
异常。 - 当上游和下游位于不同线程时,如果上游发送的事件超过了下游的声明,事件会被放在水缸当中,这个水缸默认的大小是
128
,只有当下游调用request
时,才从水缸中取出事件发送给下游,如果水缸中事件的个数超过了128
,那么也会抛出MissingBackpressureException
异常。
下面这段代码,我们先将三个事件放入到水缸当中,之后每次调用request
方法就会从水缸当中取出一个事件发送给下游。
static void flowSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
static void clickSubscription() {
if (sSubscription != null) {
sSubscription.request(1);
}
}
当上游和下游位于不同的线程,每次通过Subscription
调用request
就会从水缸中取出一个事件,发送给下游:
5.3.2 BUFFER 策略
- 使用
BUFFER
策略时,相当于在上游放置了一个容量无限大的水缸,所有下游暂时无法处理的消息都放在水缸当中,这里不再像ERROR
策略一样,区分上游和下游是否位于同一线程。 - 因此,如果下游一直没有处理消息,那么将会导致内存一直增长,从而引起
OOM
。
static void clickSubscription() {
if (sSubscription != null) {
sSubscription.request(10);
}
}
static void flowBufferSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10000;i ++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
在上面的例子中,我们先把10000
条消息放入到水缸当中,之后通过Subscription
每次从水缸中取出10
条消息发送给下游,演示结果为:
5.3.3 DROP 策略
- 使用
DROP
策略时,会把水缸无法存放的事件丢弃掉,这里同样不会受到下游和下游是否处于同一个线程的限制。
static void flowDropSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 130; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
我们先往水缸中放入130
条消息,之后每次通过Subscription
取出60
条消息发送给下游,可以看到,最后最多只取到了第128
条消息,第129/130
条消息被丢弃了。
5.3.4 LATEST 策略
- 和
DROP
类似,当水缸无法容纳下消息时,会将它丢弃,但是除此之外,上游还会缓存最新的一条消息,实例如下:
static void flowLatestSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 130; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
从下面的运行结果可以看出,当取出最后一批数据的时候,上游除了收到存储在水缸当中的数据,还额外收到了最后一条消息,也就是第130
条数据,这就是DROP
策略和LATEST
策略的区别:
更多文章,欢迎访问我的 Android 知识梳理系列:
- Android 知识梳理目录://www.greatytc.com/p/fd82d18994ce
- 个人主页:http://lizejun.cn
- 个人知识总结目录:http://lizejun.cn/categories/