Rxjava2 简析Flowable背压(4)

一、简介

前面几章都是介绍Observable,而Observable类是实现无背压方式的。而有背压的方式就不能使用Observable而是Flowable。其实这两个类没有太大的区别,尤其是操作符的处理这块。

关于Flowable和Observable有几个相似作用的类。

  • Flowable对应Observable
  • FlowableEmitter对应ObservableEmitter
  • Subscriber对应Observer
  • Subscription对于Disposable

关于这几个相同概念的类,背压方式的增加了一些额外的功能。

先来看下背压方式的简单实现:

// 1、创建Flowable对象
Flowable<Integer> flowable = Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.e(TAG, "subscribe: " + emitter.requested());

                        for (int i = 0; i < 128; i++) {
                            Log.e(TAG, "subscribe: "+i );
                            emitter.onNext(i);
                        }
                    }
                }, BackpressureStrategy.DROP);
        // 2、创建Subscriber观察者对象
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            Subscription mSubscription = null;

            @Override
            public void onSubscribe(Subscription s) {
                Log.e(TAG, "onSubscribe");
                mSubscription = s;
                s.request(1);
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        };
        // 3、发生订阅关系
        flowable.subscribe(subscriber);

从使用角度来说,Flowable的整体框架和Observable没有区别,仅仅是换了不同的类,但是实现的功能大体一致。

  • 1、创建Flowable对象
  • 2、创建Subscriber观察者对象
  • 3、发生订阅关系

关于出现了压力后,也有不同的策略处理。Flowable提供了以下几种策略:

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}
  • BUFFER 当发送的事件有来不及处理的时候,会放在缓冲区里面,这个缓冲区会无限的增加,直到发生OOM
  • ERROR 当FlowableEmitter发射器在emitter.requested() == 0的时候发送就会抛出异常
  • DROP Rxjava默认的缓冲区为128,如果有来不及处理的事件,就会放到缓冲区,128个放满后,接下来的事件就会抛弃。
  • LATEST 与DROP策略类似,他会抛弃最开始的数据,缓冲最后的数据。

二、基本类的介绍

1)、Subscription

/**
 * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
 * <p>
 * It can only be used once by a single {@link Subscriber}.
 * <p>
 * It is used to both signal desire for data and cancel demand (and allow resource cleanup).
 *
 */

public interface Subscription {

    public void request(long n);

    public void cancel();
}

对于Subscription的解释是,它是和Subscriber对象是一对一的关系的,以及他是个控制类,控制事件流的流向。用户可以用该对象去拉取相应的数据。

Subscription类是对应Disposable类的,Disposable类原来的作用就是取消事件流的,Subscription保留了该方法。但同时增加了拉取方法request。该对象会在调用观察者Subscriber的时候传入。

2)、Subscriber

Subscriber是观察者,对应着Observable中的Observer类。

public interface Subscriber<T> {
    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#request(long)} is invoked.
     * <p>
     * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
     * 
     * @param s
     *            {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();
}

Subscriber相比较Observer类,在onSubscribe传入的不是Disposable对象,而是Subscription对象,使用Subscription对象控制事件流。

3)、FlowableEmitter

FlowableEmitter类是发射器类,对应ObservableEmitter类。

public interface FlowableEmitter<T> extends Emitter<T> {

    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param s the disposable, null is allowed
     */
    void setDisposable(Disposable s);

    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(Cancellable c);

    /**
     * The current outstanding request amount.
     * <p>This method is thread-safe.
     * @return the current outstanding request amount
     */
    long requested();

    /**
     * Returns true if the downstream cancelled the sequence.
     * <p>This method is thread-safe.
     * @return true if the downstream cancelled the sequence
     */
    boolean isCancelled();

    /**
     * Ensures that calls to onNext, onError and onComplete are properly serialized.
     * @return the serialized FlowableEmitter
     */
    FlowableEmitter<T> serialize();
}

FlowableEmitter类有他独特的方法,主要的方法是long requested();可以用该方法来感知当前的下游的情况,可以使下游和上游产生联系。

FlowableEmitter本身是继承Emitter的,它具有发射的功能。在发射的之前可以通过requestd方法判断下游还可以处理多少,这样就完成了响应式拉取的核心东西。

3)、Flowable被观察者

Flowable是整个观察者模式中的被观察者概念。作为被观察者它是有个订阅功能。

Flowable是继承自Publisher类,该类的作用就是定义一个订阅方法。其中Subscriber是观察者。

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Flowable的结构和Observable是差不多的,现在就来看下一份精简的代码。

public abstract class Flowable<T> implements Publisher<T> {

    @BackpressureSupport(BackpressureKind.SPECIAL)
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }

    @BackpressureSupport(BackpressureKind.SPECIAL)
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Subscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            s = RxJavaPlugins.onSubscribe(this, s);

            ObjectHelper.requireNonNull(s, "Plugin returned null Subscriber");

            subscribeActual(s);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

    protected abstract void subscribeActual(Subscriber<? super T> s);

}

源码一看和Observable没什么区别,通过Create或者其他类似方式创建一个Flowable对象。在Create中和其他操作符中保存上一个的Observable对象,在subscribe中用上一个Observable对象调用下游传入的Subscriber,这样就形成了一个链式结构。

三、响应式拉去原理

背压策略

白话描述响应式拉取原理:观察者中的onSubscribe方法被调用的时候,会传入一个Subscription对象,该对象的request(int n)方法就是发送一个命令,下游可以处理n个数据。保存的变量就会增加n。上游的发送器中也可以访问到这个保存的变量,通过判断这个变量是否0就知道下游现在的情况了,从而触发onNext()方法继续发送事件。

现在就开始从源码角度看了:

  • 1、注册的时候走到FlowableCreate中的subscribeActual方法
  @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }

可以看出这里传入不同的策略,会生成对应的BaseEmitter策略子类。

  • 第一步还是执行相应的 t.onSubscribe(emitter);传入的Emitter就是Subscription的子类。

  • 第二步还是source.subscribe(emitter);通知用户开始发射事件流。

  • 2、第二步触发事件流

public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
      if (emitter.requested()>0){
            emitter.onNext(1);
      }
}

source.subscribe(emitter)执行中,通常会用emitter去触发emitter.onNext(int),这样会触发事件流,也就到了DropAsyncEmitter中的onNext方法。

DropAsyncEmitter#onNext源码:

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            if (get() != 0) {
                actual.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }

前面都是检测合法性,重要的是下面的判断,如果get()!=0,说明下游可以处理数据,那就发送一个数据,否则则丢弃不管了(该事件到这就停止了),也就是Drop策略。get()!=0则处理actual.onNext(t);并将记录的处理数据减一。

再看下生产者这边,也就是在BaseEmitter中的request(int)方法:

        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }

先判断传入的n是否大于0是否合法,合法就更改可以处理的保存的记录数,生产者到这就完成了。

这里逻辑就基本清晰了,生产者(request(int))和消费者(onNext()),他们操作的是同一个内存数据。

生产者消费者模型

  • 触发onNext()是消费者,代表消费一个数据,前提是有数据可以消费。
  • request(int)是生产者,生产数据。

由于生产者和消费者可以在不同的线程操作,可能会带来线程不安全,所以采用了AtomicLong线程安全的Long来保存可消费的数据。

关于对数据内存的操作,被封装成了单独的类,下面是精简版本

public final class BackpressureHelper {
    /** Utility class. */
    private BackpressureHelper() {
        throw new IllegalStateException("No instances!");
    }

     // 判断是否超出了范围
    public static long addCap(long a, long b) {
        long u = a + b;
        if (u < 0L) {
            return Long.MAX_VALUE;
        }
        return u;
    }

    // 生产数据
    public static long add(AtomicLong requested, long n) {
        for (;;) {
            long r = requested.get();
            if (r == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long u = addCap(r, n);
            if (requested.compareAndSet(r, u)) {
                return r;
            }
        }
    }


    // 消费数据
    public static long produced(AtomicLong requested, long n) {
        for (;;) {
            long current = requested.get();
            if (current == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long update = current - n;
            if (update < 0L) {
                RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
                update = 0L;
            }
            if (requested.compareAndSet(current, update)) {
                return update;
            }
        }
    }
}

以add(AtomicLong requested, long n)生产者为例。传入的参数是requested代表原来的数据的索引,n为需要再生产个数。本质上就是将request+n,但这样是线程不安全的。

首先使用for (;;) 循环,这里没有采用锁机制,而是采用自旋锁,将生产的数据和理论的数据进行对比,如果不是则重新操作,一致就说明这些操作是线程安全的,采用原理的比较设置为最新的数即可。

同理消费者也是。

四、小结

关于响应式拉取,本质上就是生产者和消费者模型。AtomicLong代表着数据源,是个被操作的对象;FlowableEmitter为发射器是消费者;Subscription为控制器,内部有request(int)方法,为生产者。

而BaseEmitter则继承了AtomicLong、FlowableEmitter和Subscription这三个类,也就是说明在BaseEmitter是个集大成者,将生产者消费者模型中的元素都放在了一起,产生了联系,这样就可以操作了。

其中BackpressureHelper是个代理处理数据源的类,他采用自旋锁机制提高了CPU的利用率,也保证了安全。因为这里的线程冲突场景不可能特别多,加锁的话会占用上千的CPU时钟周期,而自旋锁一般状态也就消耗几个时钟周期。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,565评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,021评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,003评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,015评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,020评论 5 370
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,856评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,178评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,824评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,264评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,788评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,913评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,535评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,130评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,102评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,334评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,298评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,622评论 2 343

推荐阅读更多精彩内容