响应式编程是一种编程模式,关注数据流,对数据的改变做出响应和分发。响应式编程模式中,激励元素是输送进来的可能无边界的数据,称为数据流(Stream)。
符合响应式编程的一个流行的模型是“响应式扩展”(Reactive Extension),它有多种语言实现的库,例如支持java语言的RxJava,支持javascript的RxJs等,他们用来基于响应式扩展模型在不同语言下做开发。
在“响应式扩展”模型里,数据流叫被观察者(Observable),观察者(Observer)从被观察者那里订阅(subscribe)数据并在数据到来时做出响应。Observable会产生0到N个数据,数据发布完成后有一个完成事件,如果中间出错会有一个出错事件。订阅者要求有相应的逻辑来处理这些数据或事件。
下面以以RxJava代码为例子进一步了解 “响应式扩展” 模型,订阅者订阅数据流后需要编写对数据流的处理逻辑:
observable.subscribe(
data -> { // onNext 处理数据
System.out.println(data);
},
error -> { // onError 处理出错事件
error.printStackTrace();
},
() -> { // onComplete 处理完成事件
System.out.println("No more data");
}
);
RxJava 提供了对下列类型数据流的支持:
- Observables 产生一系列数据,数据可以无边界
- Singles 产生确定的一个数据
- Completables 没有数据,只有完成或出错事件
RxJava 1.x 和 RxJava 2.x
RxJava 2.x 除了提供Observable,还提供了增强型的Observables,叫Flowable,用来支持背压机制,符合响应流标准。(详见背压和响应流标准)
RxJava 2.x 还引入了新的Maybe数据流,可以是0个或1个数据,以及出错事件。
如果你想亲手试试,那就利用maven下载RxJava 2.x 的依赖包试一试:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.6</version>
<scope/>
</dependency>
写几行简单的Java代码试试:
import io.reactivex.*;
public class Main {
public static void main(String[] args) {
Observable.range(0,10).subscribe(
data-> {System.out.println(data);},
error ->{error.printStackTrace();},
()->{System.out.println("no more data");}
);
//支持背压的响应流标准方式
Flowable.just("RxJava 2.x Flowable").subscribe(System.out::println);
}
}
参考资料:
http://reactivex.io/
https://github.com/ReactiveX/RxJava
<<building reactive microservices in Java>> by Clement Escoffier