目录
- [前言]
- [一、接口]
- [1、Publisher]
- [2、Subscriber]
- [3、Subscription]
- [4、Processor]
- [二、使用示例]
- [三、总结]
前言
jdk 9 中新增了 Flow 类,是Reactive Stream ([响应式]流/反应流) 的实现,Reactive Stream是一套基于发布/订阅模式的数据处理规范,能够以非阻塞背压方式实现数据的异步流。
一、接口
1、Publisher
发布者,进行数据发布
@FunctionalInterface
public static interface Publisher<T> {
//发布者与订阅者建立订阅关系
public void subscribe(Subscriber<? super T> subscriber);
}
2、Subscriber
订阅者,订阅数据
public static interface Subscriber<T> {
//发布者通知订阅者开始发送数据
public void onSubscribe(Subscription subscription);
//订阅者接收数据
public void onNext(T item);
//出现异常
public void onError(Throwable throwable);
//数据处理完成
public void onComplete();
}
3、Subscription
数据订阅,请求数据、取消订阅
public static interface Subscription {
public void request(long n);
public void cancel();
}
4、Processor
数据的中间操作,既是Subscriber也是Publisher
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
二、使用示例
1、数据订阅者
public class FlowSubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscriber 建立订阅关系");
//发布订阅关系
this.subscription = subscription;
//请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("订阅者接收消息: " + item);
//接收数据后 再请求一个数据
this.subscription.request(1);
//不再接收数据,调用cancel
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
System.out.println("订阅者数据接收出现异常,error :" + throwable.getMessage());
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("订阅者数据处理完成");
}
}
2、数据中间处理
public class FlowProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Processor 建立订阅关系");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Processor 接收数据: " + item);
item += " Processor 处理后的消息";
this.submit(item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Processor 数据接收出现异常,error :" + throwable.getMessage());
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("Processor 数据处理完成");
}
3、测试
public class FlowTest {
public static void main(String[] args) {
//数据发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//中间处理者
FlowProcessor processor = new FlowProcessor();
//数据订阅者
Subscriber<String> subscriber = new FlowSubscriber();
//发布者与中间处理者建立关系
publisher.subscribe(processor);
//中间处理者与订阅者建立关系
processor.subscribe(subscriber);
//发布者开始发布数据
for (int i = 0; i < 10; i++) {
String msg = "hello flow: " + i;
System.out.println("发布者发送数据" + i);
publisher.submit(msg);
}
//关闭发布者
publisher.close();
//休眠等待,防止主线程退出
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果:
Subscriber 建立订阅关系
Processor 建立订阅关系
发布者发送数据0
发布者发送数据1
发布者发送数据2
发布者发送数据3
发布者发送数据4
发布者发送数据5
发布者发送数据6
发布者发送数据7
发布者发送数据8
发布者发送数据9
Processor 接收数据: hello flow: 0
Processor 接收数据: hello flow: 1
Processor 接收数据: hello flow: 2
Processor 接收数据: hello flow: 3
Processor 接收数据: hello flow: 4
Processor 接收数据: hello flow: 5
Processor 接收数据: hello flow: 6
订阅者接收消息: hello flow: 0 Processor 处理后的消息
Processor 接收数据: hello flow: 7
订阅者接收消息: hello flow: 1 Processor 处理后的消息
订阅者接收消息: hello flow: 2 Processor 处理后的消息
Processor 接收数据: hello flow: 8
订阅者接收消息: hello flow: 3 Processor 处理后的消息
Processor 接收数据: hello flow: 9
订阅者接收消息: hello flow: 4 Processor 处理后的消息
Processor 数据处理完成
订阅者接收消息: hello flow: 5 Processor 处理后的消息
订阅者接收消息: hello flow: 6 Processor 处理后的消息
订阅者接收消息: hello flow: 7 Processor 处理后的消息
订阅者接收消息: hello flow: 8 Processor 处理后的消息
订阅者接收消息: hello flow: 9 Processor 处理后的消息
三、总结
1、发布者 Publisher 将消息发给订阅关系 Subscription;
2、订阅关系 Subscription 将消息发给订阅者 Subscriber;
3、中间操作 Processor 作为 发布者 Publisher 的 订阅者,又作为订阅者 Subscriber 的发布者