Spring 5 中引入了Reactive理念,下文主要介绍Reactive模式的基础。
工程地址,分支为reactive-operations
。
Reactive概念
Reactive是函数式编程(Functional),管道流(pipeline, stream), 异步非阻塞的,事件驱动的。
org.reactivestreams
包中主要有4个接口
- 发布者
Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
- 订阅者
Subscriber
当接收到Publisher
的数据时,会调用响应的回调方法。注册完成时,首先会调用onSubscribe
方法,参数Subscription s
包含了注册信息。
public interface Subscriber<T> {
// 注册完成后,首先被调用
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
- 订阅
Subscription
- 通过订阅,订阅者
Subscriber
可以请求数据request
,或者取消订阅cancel
。 - 在请求数据时,参数
long n
表示希望接收的数据量,防止发布者Publisher
发送过多的数据。 - 一旦开始请求,数据就会在流
stream
中传输。每接收一个,就会调用onNext(T t)
;发生错误时,onError(Throwable t)
被调用;传输完成后,onComplete()
被调用。
public interface Subscription {
// 请求数据,参数n为请求的数据量,不是超时时间
public void request(long n);
// 取消订阅
public void cancel();
}
Processor
可以看出,Processor
接口继承了Subscriber
和Publisher
,是流的中间环节。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Reactive Stream中数据从Publisher开始,经过若干个Processor,最终到达Subcriber,即完整的Pipeline。
Project Reactor
依赖
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
Mono
和Flux
- 抽象类
Mono
和Flux
实现了Publisher
接口,他们是发布者。 -
Mono
表示少于等于1个数据(即0个, 或1个数据)或错误;Flux
表示一连串多个数据。
操作
- 创建
Flux
或Mono
,调用subscribe()
后,数据开始流动。
主要方法有:just
, fromArray
, fromStream
, fromIterable
, range
@Test
public void create() {
//just方法
String[] arr = new String[]{"hello", "world"};
Flux<String> flux1 = Flux.just(arr);
flux1.subscribe(System.out::println);
Mono<String> mono = Mono.just("hi world");
mono.subscribe(System.out::println);
//fromArray方法
List<String> list = Arrays.asList("hello", "world");
Flux<String> flux2 = Flux.fromIterable(list);
//fromIterable方法
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> flux3 = Flux.fromIterable(fruitList);
//fromStream方法
Stream<String> stream = Stream.of("hi", "hello");
Flux<String> flux4 = Flux.fromStream(stream);
//range方法
Flux<Integer> range = Flux.range(0, 5);
//interval方法, take方法限制个数为5个
Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
}
- 合并
mergeWith
@Test
public void mergeFlux() {
Flux<String> source1 = Flux.just("hello", "world");
Flux<String> source2 = Flux.just("hi", "ted");
Flux<String> merge = source1.mergeWith(source2);
merge.subscribe(System.out::println);
}
- 结合为
Tuple2
元组类型zipWith
@Test
public void zipFlux() {
Flux<String> source1 = Flux.just("hello", "world");
Flux<String> source2 = Flux.just("hi", "ted");
Flux<Tuple2<String, String>> zip = source1.zipWith(source2);
zip.subscribe(tuple -> {
System.out.println(tuple.getT1() + " -> " + tuple.getT2());
});
}
- 转换和过滤
skip
: 略过2个
@Test
public void skipFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<String> skip = source1.skip(2);
skip.subscribe(System.out::println);
}
take
:只取前2个
@Test
public void takeFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<String> skip = source1.take(2);
skip.subscribe(System.out::println);
}
filter
: 接收Predicate
@Test
public void filterFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<String> skip = source1.filter(s -> s.startsWith("h"));
skip.subscribe(System.out::println);
}
distinct
: 去重
@Test
public void distinctFlux() {
Flux<String> source1 = Flux.just("hello", "hello", "world", "hi", "ted");
Flux<String> skip = source1.filter(s -> s.startsWith("h")).distinct();
skip.subscribe(System.out::println);
}
map
: 接收Function
@Test
public void mapFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<String> skip = source1.map(s -> s + " is mapped");
skip.subscribe(System.out::println);
}
flatMap
: 根据Flux中的元素先生成Mono, 再对Mono中的元素进行map转换。
@Test
public void flatMapFlux() {
Flux<String> source1 = Flux.just("hello world", "hi ted");
Flux<String> flatMap = source1.flatMap(s -> Mono.just(s).map(s1 -> {
String[] strings = s1.split("\\s");
return new String(strings[0] + " - " + strings[1]);
}));
flatMap.subscribe(System.out::println);
}
buffer
: 将stream中的数据按照固定大小分配,新的Flux中的List的元素个数是2
@Test
public void bufferFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<List<String>> buffer = source1.buffer(2);
buffer.subscribe(strings -> System.out.println(strings.size()));
}
collectList
: 将Flux中的元素收集到一个List中
@Test
public void collectListFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Mono<List<String>> mono = source1.collectList();
mono.subscribe(System.out::println)
}
collectMap
: 将Flux中的元素提取为一个Map,Map的key根据Function生成
@Test
public void collectMapFlux() {
Flux<String> source1 = Flux.just("hello", "world", "ted");
Mono<Map<Character, String>> map = source1.collectMap(s -> s.charAt(0));
map.subscribe(characterStringMap -> System.out.println(characterStringMap.get('t')));
}
- 逻辑运算
all
: 判断Flux中元素是否都满足Predicate条件
@Test
public void allFlux() {
Flux<String> source1 = Flux.just("hello", "world", "ted");
Mono<Boolean> mono = source1.all(s -> s.contains("e"));
mono.subscribe(System.out::println);
}
any
: 判断Flux中元素是否至少有1个满足Predicate条件
@Test
public void anyFlux() {
Flux<String> source1 = Flux.just("hello", "world", "ted");
Mono<Boolean> mono = source1.any(s -> s.contains("e"));
mono.subscribe(System.out::println);
}