(1)背压的存在背景
默认情况下,上游是在主线程执行的,那么下游也必然在主线程中运行,比如:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i=0;;i++){
e.onNext(String.valueOf(i));
}
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Thread.sleep(3000);
Log.d("aaa", String.valueOf(s));
}
});
当使用subscribeOn来控制上游线程时,比如:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i=0;;i++){
e.onNext(String.valueOf(i));
Log.d("aaa", "==========="+Thread.currentThread().getName());
}
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Thread.sleep(3000);
Log.d("aaa", String.valueOf(s)+"----"+Thread.currentThread().getName());
}
});
subscribeOn将上游的线程切换到IO线程,那么下游也自然而然在IO线程执行。
以上两种情况(没有控制线程或者subscribeOn控制上游线程),当上游发送一个数据之后,等到下游接收到数据之后上游才能继续发送数据,这样也就不会发生异常。
当我们使用observeOn时,上游和下游的执行就会独自运行了,即使如以下代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i=0;;i++){
e.onNext(String.valueOf(i));
Log.d("aaa", "==========="+Thread.currentThread().getName());
}
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Thread.sleep(3000);
Log.d("aaa", String.valueOf(s)+"----"+Thread.currentThread().getName());
}
});
以上代码上游和下游都是运行在主线程,但是经过测试,只要使用了observeOn控制了下游的线程,那么第二次发送数据就不需要等到下游接收到数据之后才能发送了。也就是说,只要使用observeOn,上游和下游就会分别独自运行。
大部分情况,发送数据比较快,接收数据相对比较慢,也就是说:
发送的数据个数 > 接收数据的个数
当下游来不及处理上游发送的数据时,这些发送的数据会存放在一个缓存区,当缓存区越来越大时,会发生OOM的现象,日志请看下图
我们来看一下内存情况
由于缓存越来越大,导致内存泄漏非常严重,等缓存大到一定程度就会发生OOM。
为了解决这样的问题,出现了背压策略。
(2)Flowable
在RxJava2中,采用Flowable来处理背压问题,Flowable的效率要比Observable低,所以最好当需要处理背压问题时再使用Flowable。
先贴一下代码实现
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
for(int i=0;i<129;i++){
e.onNext(String.valueOf(i));
Log.d("aaa","已发送数据:"+i);
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
//设置最多可接受数据的数量
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
Log.d("aaa", s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
Log.d("aaa", "t:"+t.getMessage());
}
@Override
public void onComplete() {
Log.d("aaa", "===完成===");
}
});
前面讲到的上游和下游分别是Observable
和Observer
, 以上代码上游变成了Flowable
,下游变成了Subscriber
,上游和下游由subscribe() 来连通。
代码中有两点比较重要
-
request
s.request(Long.MAX_VALUE);
这句话的意思是说,设置最大接收数据的数量,这里设置Long.MAX_VALUE就可以了。
- BackpressureStrategy
Flowable.create的第二个参数就是策略常量。
(3)BackpressureStrategy
先看一下源码
/**
* Represents the options for applying backpressure to a source sequence.
*/
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
在RxJava2中,给我们提供了5种背压策略
- BackpressureStrategy.MISSING
OnNext事件是在不进行任何缓冲或删除的情况下写入的下游必须处理任何溢出
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
for(int i=0;;i++){
e.onNext(String.valueOf(i));
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
//设置最多可接受数据的数量
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
Log.d("aaa", s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
Log.d("aaa", "t:"+t.getMessage());
}
@Override
public void onComplete() {
Log.d("aaa", "===完成===");
}
});
日志显示,只要缓存队列满了就会抛出MissingBackpressureException异常,下游消费的数量是随机的。
BackpressureStrategy.MISSING策略本身不会产生多余缓存。
- BackpressureStrategy.ERROR
如果下游无法跟上上游发送的速度,则会发出反向压力异常信号。
解决这个问题,还是限制一下发送数据的速度为好。
- BackpressureStrategy.BUFFER
缓存上游发送的数据,直到下游消费为止。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
for(int i=0;;i++){
e.onNext(String.valueOf(i));
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
//设置最多可接受数据的数量
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("aaa", s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
Log.d("aaa", "t:"+t.getMessage());
}
@Override
public void onComplete() {
Log.d("aaa", "===完成===");
}
});
这种策略依然会发生OOM,消耗的内存比Observable
慢。
- BackpressureStrategy.DROP
这个策略存在一个长度为128大小的缓存区,当缓存区满时下游则不再接收数据,等到缓存区清理的时候才可以再次接收数据。
- BackpressureStrategy.LATEST
只保留最新的onnext值,如果下游无法跟上,则覆盖任何以前的值。
与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据。