响应式编程
Flow 是支持响应式编程的一套规范,里面定义了一整套需要实现的接口
调用关系图如下:
使用实例:
- 定义Subscriber
package com.luo.service;
import java.util.concurrent.Flow;
public class DemoSubscriber implements Flow.Subscriber {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("sub-建立订阅关系第一次调用");
this.subscription = subscription;
this.subscription.request(5);
}
@Override
public void onNext(Object item) {
System.out.println("sub-接受数据:" + item);
try {
Thread.sleep(2 * 1000);
this.subscription.request(2);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable);
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("sub-数据接收完成");
}
}
- 测试
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class ReactiveMain {
public static void main(String[] args) throws InterruptedException {
// 发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 订阅者
Flow.Subscriber subscriber = new DemoSubscriber();
publisher.subscribe(subscriber);
Thread.sleep(5 * 1000);
System.out.println("p没有发送过消息,已经等待10秒");
try {
for (int i = 0; i < 10; i++) {
publisher.submit("" + i);
Thread.sleep(500);
}
System.out.println("所有消息发送完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
publisher.close();
}
System.out.println("p 关闭发送");
Thread.sleep(10 * 1000);
Thread.currentThread().join();
}
}
Processor 使用测试
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class DemoProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("process-第一次建立订阅关系");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("process-接受到数据" + item);
item += "-process 处理后的消息";
this.submit(item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable);
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("process-数据处理完成");
}
}
使用测试
import java.util.concurrent.SubmissionPublisher;
public class ReactiveProcessorMain {
public static void main(String[] args) throws InterruptedException {
// 发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 订阅者
DemoProcessor processor = new DemoProcessor();
publisher.subscribe(processor);
DemoSubscriber subScriber = new DemoSubscriber();
processor.subscribe(subScriber);
try {
for (int i = 0; i < 10; i++) {
publisher.submit("" + i);
Thread.sleep(500);
}
System.out.println("所有消息发送完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
publisher.close();
}
System.out.println("p 关闭发送");
Thread.sleep(10 * 1000);
Thread.currentThread().join();
}
}