微服务的确不是一个新事物。它出现于19世纪70的研究,近来变得注目,因为微服务是更快迁移、传递值更容易、提升更敏捷的途径,然而,微服务源于Actor模型的系统和服务设计,动态的、自治的系统,领域驱动设计,以及分布式系统。有条理的微服务模块设计必然引领开发者创建分布式系统。我相信您注意到了,分布式系统是难做。它易出错,它慢,它受约束于CAP和FLP理论。另一方面,它构建和维护都很复杂。响应式起源于此。
但是什么是响应式? 目前响应式是一个极度被使用的名词。牛津词典定义响应式是”对刺激作出响应”,因此,响应式软件基于它接受到刺激作出响应、调节它的行为。然而,这个定义刺激出的响应是编程方面的挑战,因为计算流不是被程序员控制而是被刺激。在这个章节中,我们将看看Vert.X怎样在响应式上帮助你:
. 响应式编程:一个聚焦于订阅数据流、响应变化并传播的开发模型
. 响应式系统:一种基于异步消息传递的构建响应式、健壮的分布式系统的构架风格
响应式微服务是响应式微服务系统的构成块,然而,因为异步的特征,这些微服务的实现是有挑战性的。响应式编程减少了这种复杂性。它怎样做到的?让我们现在回答这个问题。
响应式编程是一个面向数据流和数据传播的开发模型。在响应式编程里,刺激是在流中传输的数据,这被称为流(streams)。有许多方式来实现响应式编程。在这本书中,我们将采用
Reactive Extensions(http://reactivex.io/),在这里,流(streams)被称为observables,消费者订阅这些observables并作出响应。
为了让这些概念没那么抽象,让我们看一个RxJava (https://github.com/ReactiveX/RxJava)的例子,RxJava是一个用Java实现了Reactive Extensions的库包。这些例子被放在代码仓库的reactive-programming目录下:
observable.subscribe(
data-> { // onNext
System.out.println(data);
},
error -> { // onError
error.printStackTrace();
},
() -> { // onComplete
System.out.println("No more data");
}
);
在这一小段里,代码是订阅一个Observables,当值在流中传输时被通知。订阅者能够收到3种类型的事件:当一个新值出现时onNext被call,当流中发生一个错误或者一个步骤抛出一个异常时onError被call,当流抵达末尾时onComplete回调被请求,对于一个无限的流,onComplete将不被触发。RxJava包含了产生、转换、协同Observables,比如map转换一个值为另一个值,flatMap产生一个Observables或者另一个异常操作链:
// sensor is an unbound observablepublishing values.
sensor
// Groups values 10 by 10, and produces anobservable
// with these values.
.window(10)
// Compute the average on each group
.flatMap(MathObservable::averageInteger)
// Produce a json representation of theaverage
.map(average -> "{'average': "+ average + "}")
.subscribe(
data -> {
System.out.println(data);
},
error -> {
error.printStackTrace();
}
);
RxJava v1.x定义了下面的流类型:
Observable: 预期包含一个值序列的有界的或无界的流
Single:是包含单一值的流,通常预期一个操作的结果,类似于future或promise
Completable:是不包含值但包含一个指示是否一个操作成功或失败的流
RxJava2
RxJava2最近被release了,这本书仍然使用RxJava1.x。 RxJava2.x提供了相似的概念。RxJava2
增加了两个新的流类型。旧的Observable不支持背压(back-pressure),而Flowable则是支持背压的Observable。RxJava2也引进了Maybe类型,一个包含0或1个item、或者是1个错误的流类型。
用RxJava我们能做什么?举个例子,我们能够描述异步操作序列并编排它们。让我们假设你想下载一个文档,处理然后上传它。下载和上传操作是异步的。开发这个序列,你使用这样的代码:
// Asynchronous task downloading a document
Future downloadTask = download();
// Create a single completed when thedocument is downloaded.
Single.from(downloadTask)
// Process the content
.map(content -> process(content))
// Upload the document, this asynchronous operation
// just indicates its successful completion or failure.
.flatMapCompletable(payload -> upload(payload))
.subscribe(
() -> System.out.println("Documentdownloaded, updated and uploaded"),
t -> t.printStackTrace()
);
你也能够编排异步任务。举个例子,合并两个异步操作的结果,你用zip操作合并两个不同流的值:
// Download two documents
Single downloadTask1 =downloadFirstDocument();
Single downloadTask2 =downloadSecondDocument();
// When both documents are downloaded,combine them
Single.zip(downloadTask1, downloadTask2,
(doc1, doc2) -> doc1 + "\n" +doc2)
.subscribe(
(doc) -> System.out.println("Documentcombined: " + doc),
t ->t.printStackTrace()
);
这些操作的运行给你超强能力:你能够以一种声明式的优雅的方式来协同异步任务和数据流。这与响应式微服务有何关系?为了回答这个问题,让我们看看响应式系统。
响应流(Reactive Streams)
你可能听说过响应流(http://www.reactive-streams.org/)。响应流是一个倡议,为支持背压的异步流处理提供一个标准。它提供了一个最小的接口和协议集合,来描述实现支持非阻塞背压的异步流数据的操作和实体。它不定义操纵流的操作,主要用在互操作层。这个倡议被Netflix, Lightbend, and Red Hat以及许多其他公司所支持。
响应式系统(http://www.reactivemanifesto.org/)
响应式编程成为一种开发模式后,响应式系统是一种用来构建分布式系统的架构风格。,为了达成响应度,构建即使是失败或负载下能对请求作出及时响应的系统,这有一些准则。
为了构建这样一个系统,响应式系统拥抱了消息驱动的途径。所有的组件交互用消息异步地发送和接收。为了解耦发送方和接收方,组件发送消息到虚拟地址、注册到虚拟地址来接收消息。一个地址是一个目标地标识,比如一个串或者一个URL。多个接收者能注册到同一个地址---传送语义依赖于底层技术。发送方不阻塞等待一个响应。发送方可以后面接收一个响应,但是同时,它能够接收和发送其它消息。异步特征是非常重要的、影响到你的应用如何开发。
异步的消息交互提供了响应式系统两个重要的特性:
. 弹性(Elasticity):水平扩展的能力
. 可恢复性(Resilience):处理失败和恢复的能力
弹性(Elasticity)来自于消息交互提供的解耦。送到一个地址的消息能被一组采用负载均衡策略的消费者消费。当一个响应式系统面临一个负载高峰时,它扩充新的消费者实例、随后杀掉它们。
可恢复性(Resilience)是被非阻塞处理失败、以及复制组件的能力所提供的。首先,消息交互允许组件处理本地失败。感谢异步特性,组件不等候响应,因此一个组件发生失败时将不影响到其它组件。复制也是一个恢复处理的关键能力。当一个节点处理消息失败时,消息能够被注册在这个地址的其它节点处理。
感谢这两个特性,系统变得可响应。它能够适应高的或者低的负载,在出现高负载或失败的情况下继续提供服务。这些是构建分布式微服务系统最基本的准则。这是必需的,运行多个服务实例以均衡负载、处理失败、不间断可用性。在下一章节,我们将看到Vert.X怎样实现这些topic。
响应式微服务
构建一个微服务系统,每个服务能够变化、演化、失败、变慢或者是在某个时刻处于离线。这要求不影响到整个系统的表现。你的系统必须拥抱变化、能够处理失败。你可以在降级模式下运行,但是你的系统应该仍然能够处理请求。
为了确保系统有如此表现,响应式微服务系统由响应式微服务构成。这些微服务有4个特征:
. 自治(Autonomy)
. 异步(Asynchronisity)
. 可恢复(Resilience)
. 弹性(Elasticity)
响应式微服务是自治的。他们能够适应周边的服务可用或不可用。然而,自治与隔离成对出现。响应式微服务能够处理本地失败、独立行动、按需要与其它微服务进行协作。一个响应式微服务用异步的消息与其它微服务进行交互。它也接收消息,有对消息作出响应的能力。
感谢异步消息传递,响应式微服务能够应付失败,相应地调节它的行为。失败不应当被扩散,而是应该在靠近根源处被处理掉。当一个微服务跨掉时,微服务的消费者必须处理失败、而不是扩散它。隔离准则是一个关键特征,阻止失败放大导致整个系统瘫痪。可恢复性不仅是失败管理,它也是自愈。一个微服务应该实现当失败发生的时候的恢复或者补偿策略。
最后,一个响应式微服务必须是弹性的,这样系统才能够调节微服务的实例数以管理负载。这意味着一些约束,比如避免内存状态,多个微服务实例共享状态(如果需要),或者对于有状态服务,能够路由消息到同样的微服务实例。
Vert.X是什么?
Vert.X是一个用异步的非阻塞的开发模式构建响应式的、分布式的系统的软件包。因为它是一个软件包而不是一个框架,你可以象别的库包一样使用Vert.X。它不约束你怎样去构建或组织你的系统,你按你所想的方式使用它。Vert.X是很灵活的,你能够用它作为一个独立的应用,或者是将它嵌入到一个大应用里面去。
站在开发者的立场上看,Vert.X是一个JAR库文件的集合。每一个Vert.X模块是一个JAR库文件,你加它们至CLASSPATH。从HTTP 服务端和客户端,到消息,到低水平协议比如TCP或UDP,Vert.X提供了大量模块来构建你所想要的应用。你可以选择任何模块附加到Vert.X核心库(Vert.x Core)来构建你的系统。图2-2展示了Vert.X生态系统的一个节选视图:
Vert.X也提供了一个大的技术栈来帮助你构建微服务系统。Vert.X推动了微服务的发展使得它变得流行。它被设计和构建以提供直观的、强有力的方式构建微服务系统。而且这不是全部。用Vert.X你能够构建微服务系统。当用Vert.X构建一个微服务时,它贯彻了它的核心特性到微服务中:它完全是异步的。
异步开发模式
所有用Vert.X构建的应用都是异步的。Vert.X应用是事件驱动的、非阻塞的。当它关注的事件发生时,你的应用被通知。让我们看一个具体例子。Vert.X提供了一个容易的方式创建一个HTTP Server,这个HTTP Server每次收到http请求时被通知:
vertx.createHttpServer()
.requestHandler(request -> {
// This handler will be called every time an HTTP
// request is received at the server
request.response().end("hello Vert.x");
})
.listen(8080);
在这个例子中,我们设置了一个请求处理器来接收http请求(事件)、返回hello Vert.x (响应)。一个处理器(Handler)是一个回调函数,当一个事件发生时。在我们的例子中,每一个请求进入时,handler的代码被执行。注意,一个handler不返回一个结果,可是,一个handler能够提供一个结果。结果怎样被提供,依赖于交互的类型。在最后的一小段里,它仅仅输出结果到http response。这个handler被链到一个socket请求监听器,向这个http endpoint发出请求,生成一个简单的http响应:
HTTP/1.1 200 OK
Content-Length: 12
hello Vert.x
几乎没有例外,在Vert.X里,没有API会阻塞发起调用的线程。如果一个结果能够被立刻提供,它就被返回,否则,一个处理器(handler)被用来接收稍后的事件。当一个被处理事件就绪或者当一个异步操作完成运算后,处理器被通知。
在传统的急切编程中,你将像这样写代码:
int res =compute(1, 2);
在这个代码里,你等待方法的结果。当切换至异步的非阻塞的开发模式时,你传递一个处理器(handler),当结果就绪时它被请求:
compute(1, 2, res -> {
// Called with the result
});
在最后一小段里,compute不再返回结果,因此你不用等到结果被计算并返回,你传递了一个处理器,当结果就绪时它被回调。
感谢非阻塞的开发模式,你能够用很少的线程来处理高并发的负载。在大多数情况下,Vert.X用一个称之为事件轮询(event loop)的线程来调用你的处理器(handler)。事件轮询在图2-3中描绘,它消费一个事件队列,分发每个事件给相关的处理器(handler):
事件轮询所提倡的线程模型有巨大的好处:它使并发简单化。因为这只有一个线程,你总是被同一个线程调用、决不会同时被调用。然而,它有一个很重要的法则,你必须遵从:
不要阻塞事件轮询 --- Vert.X黄金法则
因为没有阻塞,一个事件轮询能够在很短的时间里传送大量事件,这被称为响应模式(https://en.wikipedia.org/wiki/Reactor_pattern)。
让我们想像一下,你打破了这个法则。在前面的代码片段里,请求处理器总是被同一个事件轮询调用,因此,如果http请求处理阻塞、而不是立即返回,其它请求将不能被及时处理,被放至队列中、等候事件轮询线程被释放。你就失去了伸缩性以及Vert.X的效率优势。那么什么会阻塞?第一个显而易见的例子是JDBC数据库访问,它们自然是阻塞的。长时间的运算也是阻塞的,例如,计算Pi到200,000位小数点的计算肯定是阻塞的。不用担心,Vert.X也提供了处理阻塞的组件。
在一个标准的响应式实现里,只有单一的事件轮询线程,在一个环里面传送事件给事件处理器。单一事件轮询线程的问题是:它只能运行在一个CPU核中。Vert.X的机制有所不同,替代单一事件轮询,每一个Vert.X实例维护几个事件轮询,这被称为多响应模型,正如图2-4所展示:
事件是被不同的事件轮询器分发,然而,一旦一个处理器(handler)被一个事件轮询器执行,它就总是被这个事件轮询器调用,坚守响应模式的并发优势。像图2-4,你有多个事件轮询器,它能够均衡负载到不同的CPU核。在我们的http例子里,它怎样工作?一旦Vert.X注册了socket监听器,分发请求到不同的事件轮询器。
Verticles --- 构建块
Vert.X在怎样塑造应用和编码上给了你大量自由。但是,它也提供了积木,让你更容易开始写Vert.X应用,伴随着一个简单的、可伸缩的、类actor开发以及开箱即用的并发模型。Verticles是Vert.X部署和运行的代码块。一个应用,比如微服务,典型地是同一时间运行在同一个Vert.X实例里面的多个Verticles实例构成。典型地,一个Verticle创建服务器或客户端,注册一组处理器,封装部分系统的业务逻辑。
规范的Verticles是被Vert.X事件轮询器执行、决不阻塞。Vert.X确保每个verticle总是被同一个线程执行且决不并发,因此避免了同步块。在Java里,一个verticle是一个扩展Abstract
Verticle的类:
importio.vertx.core.AbstractVerticle;
public classMyVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
// Executedwhen the verticle is deployed
}
@Override
public void stop() throws Exception {
// Executedwhen the verticle is un-deployed
}
}
工作者Verticle
不像规范的verticle,工作者verticles不是在事件轮询器上执行,这意味着它们能够执行阻塞代码,然而,这限制了可伸缩性。
Verticles访问vertx成员(由AbstractVerticle类提供)来创建服务器和客户端,与其它verticles交互。Verticles也可以部署其它Verticles,配置它们,设置创建的实例数量。实例是被关联到不同的事件轮询器(实现多响应模型),Vert.X在这些实例中均衡负载。
从回调(callback)到订阅(Observables)
正如前一章节所看到的,Vert.X开发采用了回调模式。当编排多个异步操作时,基于回调的开发模式导致产生复杂的代码。例如,让我们看看怎样从数据库查询数据。首先,我们需要连接,然后我们给数据库发一个查询,处理结果,释放数据库连接,所有这些操作是异步的。用回调,你将用Vert.X JDBC客户端写下面的代码:
client.getConnection(conn -> {
if(conn.failed()) {/* failure handling */}
else{
SQLConnectionconnection = conn.result();
connection.query("SELECT* from PRODUCTS", rs -> {
if(rs.failed()) {/* failure handling */}
else{
Listlines =
rs.result().getResults();
for(JsonArray l : lines) {
System.out.println(newProduct(l));
}
connection.close(done-> {
if(done.failed()) {/* failure handling */}
});
}
});
}
});
这个例子展示了回调会很快导致代码难以阅读。你也可以用Vert.X Future来处理异步操作,不象Java
Future,Vert.X是非阻塞的。Future提供了高层次的操作组合:构建顺序操作或是并行地执行动作。典型地,正如下面的代码片段所展示的,我们组合future以构建顺序执行的异步操作:
Future future =getConnection();
future.compose(conn -> {
connection.set(conn);
//Return a future of ResultSet
returnselectProduct(conn);
})
// Return a collection of products bymapping
// each row to a Product
.map(result ->toProducts(result.getResults()))
.setHandler(ar -> {
if(ar.failed()) { /* failure handling */ }
else{
ar.result().forEach(System.out::println);
}
connection.get().close(done-> {
if(done.failed()) { /* failure handling */ }
});
});
Futures使得代码更明了,我们在一个批次里查询所有行并处理它们。结果是巨大的,检索耗费了许多时间。同时,开始处理结果,你并不需要全部结果。我们能够只要一获得每一行数据就处理它。幸运地,Vert.X提供了这种开发模式了应对之策,提供你一种用响应式编程开发模型实现响应式微服务的途径。Vert.X提供了RxJava API:
. 联合和协同异步任务
. 作为一个输入流对输入消息来响应
让我们用RxJava API来重写前面的代码:
// We retrieve a connection and cache it,
// so we can retrieve the value later.
Single connection =client.rxGetConnection();
connection
.flatMapObservable(conn ->
conn
//Execute the query
.rxQueryStream("SELECT* from PRODUCTS")
//Publish the rows one by one in a new Observable
.flatMapObservable(SQLRowStream::toObservable)
//Don't forget to close the connection
.doAfterTerminate(conn::close)
)
// Map every row to a Product
.map(Product::new)
// Display the result one by one
.subscribe(System.out::println);
除提升了可读性外,响应式编程允许你订阅一个结果流、一获得就马上处理。用Vert.X你能够选择你喜欢的开发模式,在这本书里,回调和RxJava二者我们都会使用。
让我们开始编码
是时候让你动动手了。我们打算用Apache Maven和Vert.X Maven插件来开发我们的第一个Vert.X应用。然而,你可以用任何你想用的工具:Gradle, Apache Maven加其它的插件包,或者Ant。你可以在代码仓库中找到不同的例子,在这一章节展示的代码放在hello-vertx目录下。
创建工程
创建一个my-first-vertx-app目录并切换到这个目录:
mkdir my-first-vertx-app
cd my-first-vertx-app
然后,执行下面的命令:
mvn io.fabric8:vertx-maven-plugin:1.0.5:setup \
-DprojectGroupId=io.vertx.sample \
-DprojectArtifactId=my-first-vertx-app \
-Dverticle=io.vertx.sample.MyFirstVerticle
这个命令生成了Maven工程的结构,配置vertx-maven-plugin插件,创建一个verticle类(io.vertx.sample.MyFirstVerticle),没有做其它的。
写第一个Verticle
是时候写你的第一个verticle的代码了。用下面的内容修改src/main/java/io/vertx/sample/MyFirstVerticle.java文件:
package io.vertx.sample;
import io.vertx.core.AbstractVerticle;
/**
* A verticle extends the AbstractVerticleclass.
*/
public class MyFirstVerticle extendsAbstractVerticle {
@Override
publicvoid start() throws Exception {
//We create a HTTP server object
vertx.createHttpServer()
//The requestHandler is called for each incoming
//HTTP request, we print the name of the thread
.requestHandler(req-> {
req.response().end("Hellofrom " + Thread.currentThread().getName());
})
.listen(8080);// start the server on port 8080
}
}
执行下面命令,启动这个应用:
mvn compile vertx:run
如果一切顺利,你将能够看到你的应用在浏览器中被打开(http://localhost:8080)。vertx:run目标启动Vert.x应用,并且监视代码变化。因此,如果你修改源代码,应用将被自动地重新编译并重启。
让我们现在看看应用的输出:
Hello from vert.x-eventloop-thread-0
请求被事件轮询器0所处理。你可以试试发出更多请求,请求将总是被同一个事件轮询器处理,强制的并发模型。按CTRL+C组合键停止执行。
使用RxJava
让我们看一下Vert.X支持的RxJava,更好地理解它怎样工作。在你的pom.xml文件加上下面的依赖:
io.vertx
vertx-rx-java
接下来,修改属性为io.vertx.sample.MyFirstRXVerticle,这个属性告诉Vert.X
Maven插件哪个verticle是应用的起点。用下面的内容创建新的verticle类(io.vertx.sample.MyFirstRXVerticle):
package io.vertx.sample;
// We use the .rxjava. package containingthe RX-ified APIs
importio.vertx.rxjava.core.AbstractVerticle;
importio.vertx.rxjava.core.http.HttpServer;
public class MyFirstRXVerticle extendsAbstractVerticle {
@Override
publicvoid start() {
HttpServerserver = vertx.createHttpServer();
//We get the stream of request as Observable
server.requestStream().toObservable()
.subscribe(req->
//for each HTTP request, this method is called
req.response().end("Hellofrom " + Thread.currentThread().getName())
);
//We start the server using rxListen returning a
//Single of HTTP server. We need to subscribe to
//trigger the operation
server
.rxListen(8080)
.subscribe();
}
}
Vert.X的RxJava API在包名上用rxjava,方法名以rx打头,比如rxListen。另外,API被增强,方法提供Observable对象,你可以订阅、以接收传输数据。
打包你的应用成一个Fat Jar
Vert.X Maven插件打包应用成一个Fat Jar。一旦打完包,你可以很容易启动应用,用java –jar .jar命令:
mvn clean package
cd target
java -jar my-first-vertx-app-1.0-SNAPSHOT.jar
应用将启起来,监听指定端口的http请求,键CTRL+C停止它。
作为一个不固步自封的软件包,Vert.X不推从一种打包方式超过另一种---你可以自由地选择你喜欢的打包方式。例如,你可以用fat jar, 或者把应用嵌入到一个war文件。
在这本书中,我们采用fat jar,自包含的jar,嵌入应用代码、资源、以及它所依赖的一切,这包括Vert.X,你所用的Vert.X组件和他们的依赖。这个包模型采用扁平的类加载机制,这使得更容易理解应用的启动、依赖顺序、以及日志。更重要的,它有助于减少需要安装进产品的可移动块数量。你不需要部署应用到一个存在的应用服务器。一旦它被打包成fat jar,应用可以用一个简单的java –jar 命令开始运行。
Vert.X Maven插件为你构建fat jar,但是你也可以用别比如maven-shader-plugin。
日志,监控,以及其它的产品要素
因为部署和启动简单,对于微服务和其它类型的应用,fat jar是一个很好的打包方式。但是应用服务器提供了哪些特性让你的应用作为产品而就绪?典型地,我们希望能够写和收集日志、监控应用、推送外部的配置、健康检查等等。
不用担心,Vert.X提供了所有这些特性。因为Vert.X是中立的,它提供多种选择,让你挑选或是实现你自己的。例如,对于日志,Vert.X不推行一个指定的日志框架而是允许你使用任何你想用的日志框架,比如Apache Log4J 1或者2,SLF4J,或者甚至是JUL(JDK的日志API)。如果你对Vert.X它自己的日志有兴趣,Vert.X内部的日志能够配置成这些日志框架中的任何一个。监控Vert.X应用通常是用JMX实现。Vert.x Dropwizard Metric模块提供了基于JMX的Vert.X度量。你也可以选择发布这些度量到一个监控服务,比如Prometheus(https://prometheus.io/)或者是CloudForms(https://www.redhat.com/en/technologies/management/cloudforms)。
总结
在这一章节里,我们学习了响应式微服务以及Vert.X。你也创建了你的第一个Vert.X应用。这个章节并不是一个全面的指导,仅仅提供了对主要概念的一个快捷介绍。如果你想更深地了解这些topic,查看下面的资源:
. 响应式编程和响应式系统
https://www.oreilly.com/ideas/reactive-programming-vs-reactive-systems
. 响应式宣言
http://www.reactivemanifesto.org/
. RxJava网站
https://github.com/ReactiveX/RxJava/wiki
. 用RxJava做响应式编程
http://shop.oreilly.com/product/0636920042228.do
. Vert.X网站
https://vertx.io/