Rx系列<第十八篇>:RxJava之背压策略

(1)背压的存在背景

默认情况下,上游是在主线程执行的,那么下游也必然在主线程中运行,比如:

  Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            for (int i=0;;i++){
                e.onNext(String.valueOf(i));
            }
        }
    })
            .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Thread.sleep(3000);
            Log.d("aaa", String.valueOf(s));
        }
    });

当使用subscribeOn来控制上游线程时,比如:

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            for (int i=0;;i++){
                e.onNext(String.valueOf(i));
                Log.d("aaa", "==========="+Thread.currentThread().getName());
            }
        }
    })
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Thread.sleep(3000);
            Log.d("aaa", String.valueOf(s)+"----"+Thread.currentThread().getName());
        }
    });

subscribeOn将上游的线程切换到IO线程,那么下游也自然而然在IO线程执行。

以上两种情况(没有控制线程或者subscribeOn控制上游线程),当上游发送一个数据之后,等到下游接收到数据之后上游才能继续发送数据,这样也就不会发生异常。

当我们使用observeOn时,上游和下游的执行就会独自运行了,即使如以下代码:

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            for (int i=0;;i++){
                e.onNext(String.valueOf(i));
                Log.d("aaa", "==========="+Thread.currentThread().getName());
            }
        }
    })
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Thread.sleep(3000);
            Log.d("aaa", String.valueOf(s)+"----"+Thread.currentThread().getName());
        }
    });

以上代码上游和下游都是运行在主线程,但是经过测试,只要使用了observeOn控制了下游的线程,那么第二次发送数据就不需要等到下游接收到数据之后才能发送了。也就是说,只要使用observeOn,上游和下游就会分别独自运行。

大部分情况,发送数据比较快,接收数据相对比较慢,也就是说:

发送的数据个数 > 接收数据的个数

当下游来不及处理上游发送的数据时,这些发送的数据会存放在一个缓存区,当缓存区越来越大时,会发生OOM的现象,日志请看下图

图片.png

我们来看一下内存情况

38.gif

由于缓存越来越大,导致内存泄漏非常严重,等缓存大到一定程度就会发生OOM。

为了解决这样的问题,出现了背压策略。

(2)Flowable

在RxJava2中,采用Flowable来处理背压问题,Flowable的效率要比Observable低,所以最好当需要处理背压问题时再使用Flowable。

先贴一下代码实现

Flowable.create(new FlowableOnSubscribe<String>() {

    @Override
    public void subscribe(FlowableEmitter<String> e) throws Exception {

        for(int i=0;i<129;i++){
            e.onNext(String.valueOf(i));
            Log.d("aaa","已发送数据:"+i);
        }
        e.onComplete();

    }
}, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                //设置最多可接受数据的数量
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String s) {
                Log.d("aaa", s);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
                Log.d("aaa", "t:"+t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d("aaa", "===完成===");
            }
        });

前面讲到的上游和下游分别是ObservableObserver, 以上代码上游变成了Flowable,下游变成了Subscriber,上游和下游由subscribe() 来连通。

代码中有两点比较重要

  • request

    s.request(Long.MAX_VALUE);
    

这句话的意思是说,设置最大接收数据的数量,这里设置Long.MAX_VALUE就可以了。

  • BackpressureStrategy

Flowable.create的第二个参数就是策略常量。

(3)BackpressureStrategy

先看一下源码

/**
 * Represents the options for applying backpressure to a source sequence.
 */
public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

在RxJava2中,给我们提供了5种背压策略

  • BackpressureStrategy.MISSING

OnNext事件是在不进行任何缓冲或删除的情况下写入的下游必须处理任何溢出

Flowable.create(new FlowableOnSubscribe<String>() {

    @Override
    public void subscribe(FlowableEmitter<String> e) throws Exception {

        for(int i=0;;i++){
            e.onNext(String.valueOf(i));
        }

    }
}, BackpressureStrategy.MISSING)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                //设置最多可接受数据的数量
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String s) {
                Log.d("aaa", s);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
                Log.d("aaa", "t:"+t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d("aaa", "===完成===");
            }
        });

日志显示,只要缓存队列满了就会抛出MissingBackpressureException异常,下游消费的数量是随机的。

图片.png

图片.png

BackpressureStrategy.MISSING策略本身不会产生多余缓存。

  • BackpressureStrategy.ERROR

如果下游无法跟上上游发送的速度,则会发出反向压力异常信号。

图片.png

解决这个问题,还是限制一下发送数据的速度为好。

  • BackpressureStrategy.BUFFER

缓存上游发送的数据,直到下游消费为止。

    Flowable.create(new FlowableOnSubscribe<String>() {

        @Override
        public void subscribe(FlowableEmitter<String> e) throws Exception {

            for(int i=0;;i++){
                e.onNext(String.valueOf(i));
            }

        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    //设置最多可接受数据的数量
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(String s) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.d("aaa", s);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                    Log.d("aaa", "t:"+t.getMessage());
                }

                @Override
                public void onComplete() {
                    Log.d("aaa", "===完成===");
                }
            });

这种策略依然会发生OOM,消耗的内存比Observable慢。

  • BackpressureStrategy.DROP

这个策略存在一个长度为128大小的缓存区,当缓存区满时下游则不再接收数据,等到缓存区清理的时候才可以再次接收数据。

  • BackpressureStrategy.LATEST

只保留最新的onnext值,如果下游无法跟上,则覆盖任何以前的值。

与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容