一概要
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot 来创建独立的,可用于生产的Spring 应用程序。他通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。Spring cloud stream 应用模型的结构图如图所示:
从图中可以看出spring cloud stream 构建的应用程序与消息中间件之间是通过绑定器Binder相关联的,绑定器对于应用程序而言起到了隔离的作用,它使得不同消息中间件(Rabbitmq、kafka)的实现细节对于应用程序来说是透明的。我们不需要知道消息中间件的通信细节,只需要知道Binder对应用程序提供的抽象概念来使用消息中间件实现业务逻辑即可,这个抽象的概念就是消息通道:channel。如图中所示的两条输入通道和三条输出通道,绑定器就是作为这些通道和消息中间件之间的桥梁进行通信的。
在没有绑定器的情况下,Spring boot应用要直接与消息中间件进行信息交互的时候,由于各个消息中间件的构建初衷不同,所以它们在实现细节上也会有很大的差别,这就会使得我们的消息交互逻辑会很笨重。由于对具体的消息中间件依赖太重,导致消息中间件升级或者更换的时候,我们就需要付出很大的代价。而绑定器作为中间层,通过向应用程序暴露统一的Channel,恰好实现了隔离的作用。
二RabbitMQ基本概念
2.1 基本概念
Broker:即消息队列服务器实体
Exchange:消息到达代理服务器的第一站,根据分发规则进行转发。消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
Binding Key:队列需要通过绑定键(默认为空)绑定到交换器上,交换器将消息的路由键与所绑定队列的绑定键进行匹配,正确匹配的消息将发送到队列中。路由键是偏向生产的概念,而绑定键是偏向消费的概念。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,应用程序(生产与/或消费)和代理服务器之间TCP连接内的虚拟连接,解决TCP连接数量限制及降低TCP连接代价。每个信道有一个ID,其概念与“频分多路复用”类似在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
2.2 执行过程示意图
三Spring cloud stream 核心概念
3.1 绑定器 Binder
通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。目前只提供了RabbitMQ和Kafka的Binder实现
3.2 发布-订阅模式
Spring cloud stream 中的消息通信方式采用发布订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的Topic主题进行广播,消费者在订阅的主题中收到它并触发自身的业务处理逻辑。Topic主题是spring cloud stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中对应不同的概念,如:Rabbitmq中对应的是Exchange,Kafka中对应的是Topic。绑定到同一个topic上的订阅者,都将会收到相同的消息并根据自身的需求来对这个消息进行处理。
Spring cloud stream采用的发布订阅模式,可以有效的降低生产者和消费者之间的耦合,当需要同一类消息增加一种处理方式的时候,只需要到已有的topic上绑定一个通道,就可以实现功能的扩展,而不需要改变原来已经实现了的任何内容。
3.3 消费组
在微服务中,我们的每一个微服务为了实现高可用和负载均衡,实际上都会部署多个实例。在多数情况下,当生产者发送消息给某个具体的微服务时,我们只希望它被消费一次,而不是被重复消费。消费组就是解决这一需求的。
3.4 消息分区
从上一节知道,消费组无法控制消息消息具体被哪个实例消费,只能保证是组内的一个实例消费。Spring Cloud Stream对给定应用的多个实例之间分隔数据予以支持。在分隔方案中,物理交流媒介(如:代理主题)被视为分隔成了多个片(partitions)。一个或者多个生产者应用实例给多个消费者应用实例发送消息并确保相同特征的数据被同一消费者实例处理。
比如一些监控服务,为了统计某一时间段内消息生产者发送的报告内容,监控服务需要自身聚合这些数据。这时候消息生产者可以为这些消息增加一个固定的特征来区分是,使得拥有这些特征的消息每次都能被发送发到一个特定的实例上,以达到统计效果。
四使用介绍
4.1 基本注解介绍
@Input注释标识输入通道,通过该输出通道接收到的消息进入应用程序;@Output注释标识输入通道,发布的消息将通过该通道离开应用程序。@Input和@Output注释可以使用频道名称作为参数; 如果未提供名称,将使用注释方法的名称。@EnableBinding注释添加到应用程序,以便立即连接到消息代理。@StreamListener添加到方法中,以使其接收流处理的事件。
4.2 基本用法
我们模拟这样一个场景来对spring cloud stream的使用进行说明。生产者A通过channel 发送一个消息,这个消息中包含了一个User对象(比如这个对象有两个属性,name,age),消费者通过spring cloud stream 的绑定器通过消息中间件(这里使用rabbitmq)将这个消息发送给了消费者B。消费者B的功能就是将收到的消息持久化到数据库中。
步骤一:加入spring cloud stream 依赖
步骤二:定义输入和输出通道
使用我们上文提到的@Input 和 @Output来定义。其中@Input对应的通道由消费者监听的时候使用,@Output由生产者发送消息的时候使用。我们只需要将他们定义在一个接口中,spring cloud stream 会自动帮我们实现这个接口。
步骤三:书写生产者发送消息的逻辑
注入我们在步骤二中定义的通道接口,直接使用output()中的send(Message payload) 方法将消息发送出去。
步骤四:书写消费者逻辑
将@EnableBinding写到我们的消费者所在的类上,使用@StreamListener写到我们消费者方法上面,对通道中的消息进行监听,把得到的消息处理后放入数据库,这里为了演示存入数据库就直接用LOG输出来模拟了,没有直接操作数据库。这里值得注意的是,由于我们的生产者和消费者都在同一工程中,所以我们只在消费者所在的类上加了@EnableBinding注解,而生产者所在的类上并没有加该注解。在实际的生产中,我们可能会遇到生产者和消费者不在同一工程中的情况,这个时候就要将生产者和消费者所在的类上都使用@EnableBinding注解。
步骤五:外部配置
外部配置我将它分为了两个个部分:spring cloud stream 配置、Rabbitmq配置。其中spring cloud stream的配置分为:应用级别的配置,inputChannel配置,outputChannel配置以及Binder配置四个部分。Rabbitmq分为:应用级别的配置,生产者配置,消费者配置,默认的rabbitmq配置(Rabbitmq默认使用了Spring boot的ConnectionFactory)四个部分。
Spring cloud stream 应用级别的配置以spring.cloud.stream.为前缀。
通道的通用配置以spring.cloud.stream.bindings..为前缀。
outputChannle以spring.cloud.stream.bindings..producer.为前缀。
inputChannel以spring.cloud.stream.bindings..consumer.为前缀。
绑定器的配置以spring.cloud.stream.binders..为前缀。
Rabbitmq应用级别的配置以spring.rabbitmq.为前缀。
生产者配置以
spring.cloud.stream.rabbit.bindings..producer.为前缀。
消费者配置以
Spring.cloud.stream.tabbit.bindings..consumer.为前缀
下图就是我们在这个例子中的配置:
步骤六:容错处理
当消费者中处理逻辑出现异常或者处理失败,我们希望消息重新被消费者消费时,我们就需要配置死信队列。如下图,当消费者中的消息被拒绝或者是在消费过程中出现异常的时候,消息就会进入这个死信队列。
从上面的逻辑我们可以看出,死信队列中为失败的消息设置了重试的次数,和每次重投递时的延时,即每一次重投递都会根据投递次数的增加而增加延时时长。
接上面的截图片段,要使得重投递时候的延时起效,我们需要定义一个延时交换机,如图中的DirectExchange类型的交换机,将原队列绑定到延时交换机上。
当重投递次数超过我们所设定的阈值的时候,我们就把消息投递到一个parking-lot中,以备排错。