一、概述
Reactive Streams 规范描述了开发一个响应式编程框架需要遵循的接口规范和相关原则。reactive-streams-jvm 项目中包含了 Reactive Streams 规范设计的四个主要接口和一些接口实现的 example。为了更好的理解响应式编程的核心理念,故对 reactive-streams-jvm 源码下的 examples 实现代码进行分析。
这里重点分析 examples 下的 Publisher、Subscriber、Subscription、Processor 接口实现之间的交互流程,不会对背压、异步以及响应式等核心的概念进行深入分析。
reactive-streams-jvm 源码:Github 地址
发布 - 订阅模型
Reactive Streams 核心接口类图
二、AsyncIterablePublisher
简单描述一下
AsyncIterablePublisher 是 exmples 中的其中一个实现,正如其名,它是一个使用了 Iterable 的异步序列的发布者对象,其构造方法接收以下三个参数:
- Iterable 作为数据元素的数据源。
- Executor 用作异步执行线程的调度器。
- BatchSize 可选参数,标明数据元素的的上限,默认值是1024。
// AsyncIterablePublisher.java
public AsyncIterablePublisher(final Iterable<T> elements, final Executor executor) {
this(elements, DEFAULT_BATCHSIZE, executor);
}
public AsyncIterablePublisher(final Iterable<T> elements, final int batchSize, final Executor executor) {
...
}
当 AsyncIterablePublisher 的 subscribe 方法被调用的时候,AsyncIterablePublisher 的内部类 SubscriptionImpl 的对象会被创建并初始化。SubscriptionImpl 对象初始化的方法中,会回调 Subscriber 的 onSubscribe 方法。
// AsyncIterablePublisher.java
@Override
public void subscribe(final Subscriber<? super T> s) {
new SubscriptionImpl(s).init();
}
这里涉及到例外一个关键的地方是 Signal 接口,Signal 接口的对象标识异步信号,SubscriptionImpl 类中有一个 inboundSignals 字段,用于接受 Signal 对象,当 run 方法被驱动执行的时候,会从 inboundSignals 队列中取出 Signal 对象并处理。
// AsyncIterablePublisher.java
static interface Signal {};
enum Cancel implements Signal { Instance; };
enum Subscribe implements Signal { Instance; };
enum Send implements Signal { Instance; };
static final class Request implements Signal {
final long n;
Request(final long n) {
this.n = n;
}
};
核心类图
主要流程
SubscriptionImpl 事件轮询处理逻辑
从上面的类图可以看到,SubscriptionImple 类同时实现了 Runnable 对象,是可以被 Executor 调度执行的,因此我们可以猜测核心的事件处理机制就在这里。
// SubscriptionImpl 的 run 方法实现
@Override public final void run() {
if(on.get()) {
try {
final Signal s = inboundSignals.poll();
if (!cancelled) {
// 根据类型处理消息
...
}
} finally {
on.set(false);
if(!inboundSignals.isEmpty())
tryScheduleToExecute();
}
}
}
private final void tryScheduleToExecute() {
if(on.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch(Throwable t) {
if (!cancelled) {
doCancel();
try {
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
} finally {
inboundSignals.clear();
on.set(false);
}
}
}
}
}
上述代码省略了处理具体信号的逻辑
- on 是一个 AtomicBoolean 类型的对象,初始值为 flase,在这里做同步锁的作用,保证同时只有一个线程可以执行 run 方法。
- tryScheduleToExecute 判断是否正在执行信号处理任务,如果没有,则使用 Executor 调度 this 对象启动信号处理。
- tryScheduleToExecute 是一个关键方法,每当在任何地方收到信号后,都会调用此方法尝试进行信号处理操作。
具体的业务流程图:
subscribe 事件处理
Subscriber 调用 AsyncIterablePublisher 的 subscribe 方法的时候,SubscriptionImpl 对象就会被初始化并进入 SubscriptionImpl 对象的 init 方法。init 方法会将 subscribe 信号添加到信号队列,并调用 tryScheduleToExecute 方法请求调度。
当 subscribe 信号被处理的时候,会调用 doSubscribe 方法:
private void doSubscribe() {
// 前面是处理 iterator 对象的逻辑
...
if (!cancelled) {
try {
subscriber.onSubscribe(this);
} catch(final Throwable t) {
terminateDueTo(new IllegalStateException("XXX", t));
}
// Deal with already complete iterators promptly
boolean hasElements = false;
try {
hasElements = iterator.hasNext();
} catch(final Throwable t) {
terminateDueTo(t);
}
if (!hasElements) {
try {
doCancel();
subscriber.onComplete();
} catch(final Throwable t) {
(new IllegalStateException(subscriber + " xxx",t))
.printStackTrace(System.err);
}
}
}
}
- 省略前面处理 iterator 对象的逻辑,当进入此方法的时候,首先回调 subscriber 的 onSubscribe 方法。
- 如果数据源中没有数据,会取消并通知 subscriber 数据已发送完毕。
request 事件处理
当 SubscriptionImpl 的 request 方法被调用的时候,request 方法会发送 Request 信号到信号队列,并调用 tryScheduleToExecute 尝试请求调度器处理信号。
当 request 信号被调度的时候,doRequest 方法会被调用,此方法的主要作用主要是用于更新当前的需求变量(demand),然后调用 doSend() 方法发送数据给 Subscriber。
private void doRequest(final long n) {
if (n < 1)
terminateDueTo(new IllegalArgumentException(subscriber + "..."));
else if (demand + n < 1) {
demand = Long.MAX_VALUE;
doSend();
} else {
demand += n;
doSend();
}
}
doSend 方法主要是用来给 Subscriber 发送数据的。其主要逻辑是执行背压需求判断,如果符合背压需求,并且还有数据就调用 Subscriber 的 onNext 方法将数据发送给下游。
这里如果不满足背压需求,会发出 Send 信号,等待下次被调度。
如果数据已经发送完成,则调用 Subscriber 的 onComplete 方法通知下游。
private void doSend() {
try {
int leftInBatch = batchSize;
do {
T next;
boolean hasNext;
try {
next = iterator.next();
hasNext = iterator.hasNext();
} catch (final Throwable t) {
terminateDueTo(t);
return;
}
subscriber.onNext(next);
if (!hasNext) {
doCancel();
subscriber.onComplete();
}
} while (!cancelled
&& --leftInBatch > 0
&& --demand > 0);
if (!cancelled && demand > 0)
signal(Send.Instance);
} catch(final Throwable t) {
doCancel();
(new IllegalStateException(subscriber + " ...", t))
.printStackTrace(System.err);
}
}
cancel 事件处理
当 SubscriptionImpl 的 cancel 方法被调用的时候,会添加 cancel 信号到信号队列,并舱室请求处理信号消息,当 cancel 信号被调度到的时候 doCancel 方法会被调用。
private void doCancel() {
cancelled = true;
}