一、简介
前面几章都是介绍Observable,而Observable类是实现无背压方式的。而有背压的方式就不能使用Observable而是Flowable。其实这两个类没有太大的区别,尤其是操作符的处理这块。
关于Flowable和Observable有几个相似作用的类。
- Flowable对应Observable
- FlowableEmitter对应ObservableEmitter
- Subscriber对应Observer
- Subscription对于Disposable
关于这几个相同概念的类,背压方式的增加了一些额外的功能。
先来看下背压方式的简单实现:
// 1、创建Flowable对象
Flowable<Integer> flowable = Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe: " + emitter.requested());
for (int i = 0; i < 128; i++) {
Log.e(TAG, "subscribe: "+i );
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP);
// 2、创建Subscriber观察者对象
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
Subscription mSubscription = null;
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe");
mSubscription = s;
s.request(1);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
};
// 3、发生订阅关系
flowable.subscribe(subscriber);
从使用角度来说,Flowable的整体框架和Observable没有区别,仅仅是换了不同的类,但是实现的功能大体一致。
- 1、创建Flowable对象
- 2、创建Subscriber观察者对象
- 3、发生订阅关系
关于出现了压力后,也有不同的策略处理。Flowable提供了以下几种策略:
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
- BUFFER 当发送的事件有来不及处理的时候,会放在缓冲区里面,这个缓冲区会无限的增加,直到发生OOM
- ERROR 当FlowableEmitter发射器在emitter.requested() == 0的时候发送就会抛出异常
- DROP Rxjava默认的缓冲区为128,如果有来不及处理的事件,就会放到缓冲区,128个放满后,接下来的事件就会抛弃。
- LATEST 与DROP策略类似,他会抛弃最开始的数据,缓冲最后的数据。
二、基本类的介绍
1)、Subscription
/**
* A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
* <p>
* It can only be used once by a single {@link Subscriber}.
* <p>
* It is used to both signal desire for data and cancel demand (and allow resource cleanup).
*
*/
public interface Subscription {
public void request(long n);
public void cancel();
}
对于Subscription的解释是,它是和Subscriber对象是一对一的关系的,以及他是个控制类,控制事件流的流向。用户可以用该对象去拉取相应的数据。
Subscription类是对应Disposable类的,Disposable类原来的作用就是取消事件流的,Subscription保留了该方法。但同时增加了拉取方法request。该对象会在调用观察者Subscriber的时候传入。
2)、Subscriber
Subscriber是观察者,对应着Observable中的Observer类。
public interface Subscriber<T> {
/**
* Invoked after calling {@link Publisher#subscribe(Subscriber)}.
* <p>
* No data will start flowing until {@link Subscription#request(long)} is invoked.
* <p>
* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
* <p>
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
*
* @param s
* {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscriber相比较Observer类,在onSubscribe传入的不是Disposable对象,而是Subscription对象,使用Subscription对象控制事件流。
3)、FlowableEmitter
FlowableEmitter类是发射器类,对应ObservableEmitter类。
public interface FlowableEmitter<T> extends Emitter<T> {
/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param s the disposable, null is allowed
*/
void setDisposable(Disposable s);
/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellable(Cancellable c);
/**
* The current outstanding request amount.
* <p>This method is thread-safe.
* @return the current outstanding request amount
*/
long requested();
/**
* Returns true if the downstream cancelled the sequence.
* <p>This method is thread-safe.
* @return true if the downstream cancelled the sequence
*/
boolean isCancelled();
/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized FlowableEmitter
*/
FlowableEmitter<T> serialize();
}
FlowableEmitter类有他独特的方法,主要的方法是long requested();可以用该方法来感知当前的下游的情况,可以使下游和上游产生联系。
FlowableEmitter本身是继承Emitter的,它具有发射的功能。在发射的之前可以通过requestd方法判断下游还可以处理多少,这样就完成了响应式拉取的核心东西。
3)、Flowable被观察者
Flowable是整个观察者模式中的被观察者概念。作为被观察者它是有个订阅功能。
Flowable是继承自Publisher类,该类的作用就是定义一个订阅方法。其中Subscriber是观察者。
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Flowable的结构和Observable是差不多的,现在就来看下一份精简的代码。
public abstract class Flowable<T> implements Publisher<T> {
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Subscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
s = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(s, "Plugin returned null Subscriber");
subscribeActual(s);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
protected abstract void subscribeActual(Subscriber<? super T> s);
}
源码一看和Observable没什么区别,通过Create或者其他类似方式创建一个Flowable对象。在Create中和其他操作符中保存上一个的Observable对象,在subscribe中用上一个Observable对象调用下游传入的Subscriber,这样就形成了一个链式结构。
三、响应式拉去原理
白话描述响应式拉取原理:观察者中的onSubscribe方法被调用的时候,会传入一个Subscription对象,该对象的request(int n)方法就是发送一个命令,下游可以处理n个数据。保存的变量就会增加n。上游的发送器中也可以访问到这个保存的变量,通过判断这个变量是否0就知道下游现在的情况了,从而触发onNext()方法继续发送事件。
现在就开始从源码角度看了:
- 1、注册的时候走到FlowableCreate中的subscribeActual方法
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
可以看出这里传入不同的策略,会生成对应的BaseEmitter策略子类。
第一步还是执行相应的 t.onSubscribe(emitter);传入的Emitter就是Subscription的子类。
第二步还是source.subscribe(emitter);通知用户开始发射事件流。
2、第二步触发事件流
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
if (emitter.requested()>0){
emitter.onNext(1);
}
}
source.subscribe(emitter)执行中,通常会用emitter去触发emitter.onNext(int),这样会触发事件流,也就到了DropAsyncEmitter中的onNext方法。
DropAsyncEmitter#onNext源码:
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
前面都是检测合法性,重要的是下面的判断,如果get()!=0,说明下游可以处理数据,那就发送一个数据,否则则丢弃不管了(该事件到这就停止了),也就是Drop策略。get()!=0则处理actual.onNext(t);并将记录的处理数据减一。
再看下生产者这边,也就是在BaseEmitter中的request(int)方法:
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
先判断传入的n是否大于0是否合法,合法就更改可以处理的保存的记录数,生产者到这就完成了。
这里逻辑就基本清晰了,生产者(request(int))和消费者(onNext()),他们操作的是同一个内存数据。
生产者消费者模型
- 触发onNext()是消费者,代表消费一个数据,前提是有数据可以消费。
- request(int)是生产者,生产数据。
由于生产者和消费者可以在不同的线程操作,可能会带来线程不安全,所以采用了AtomicLong线程安全的Long来保存可消费的数据。
关于对数据内存的操作,被封装成了单独的类,下面是精简版本
public final class BackpressureHelper {
/** Utility class. */
private BackpressureHelper() {
throw new IllegalStateException("No instances!");
}
// 判断是否超出了范围
public static long addCap(long a, long b) {
long u = a + b;
if (u < 0L) {
return Long.MAX_VALUE;
}
return u;
}
// 生产数据
public static long add(AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long u = addCap(r, n);
if (requested.compareAndSet(r, u)) {
return r;
}
}
}
// 消费数据
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
}
以add(AtomicLong requested, long n)生产者为例。传入的参数是requested代表原来的数据的索引,n为需要再生产个数。本质上就是将request+n,但这样是线程不安全的。
首先使用for (;;) 循环,这里没有采用锁机制,而是采用自旋锁,将生产的数据和理论的数据进行对比,如果不是则重新操作,一致就说明这些操作是线程安全的,采用原理的比较设置为最新的数即可。
同理消费者也是。
四、小结
关于响应式拉取,本质上就是生产者和消费者模型。AtomicLong代表着数据源,是个被操作的对象;FlowableEmitter为发射器是消费者;Subscription为控制器,内部有request(int)方法,为生产者。
而BaseEmitter则继承了AtomicLong、FlowableEmitter和Subscription这三个类,也就是说明在BaseEmitter是个集大成者,将生产者消费者模型中的元素都放在了一起,产生了联系,这样就可以操作了。
其中BackpressureHelper是个代理处理数据源的类,他采用自旋锁机制提高了CPU的利用率,也保证了安全。因为这里的线程冲突场景不可能特别多,加锁的话会占用上千的CPU时钟周期,而自旋锁一般状态也就消耗几个时钟周期。