spring cloud stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。官方文档的架构图如下:
为什么使用spring cloud stream
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
spring cloud stream 中消息的发布和消费,有4个组件:
- source
- channel
- binder
- sink
- source
当一个服务准备发布消息时,它将使用一个source发布消息,它接受普通的Java对象,该对象代表发布的消息,source将Java对象序列化并将消息发布到channel - channel
通道是队列的一种抽象,通道始终与目标队列名称关联,队列名称不会在代码中暴露,因此我们可以通过修改配置更改队列名称,而不是修改代码 - binder
如spring cloud stream架构图那样,binder允许开发人员处理消息而不必依赖特定平台的api来发送和消费消息 - sink
服务通过sink从队列中接受消息,将消息反序列化为pojo。
搭建消息生产和消费环境
- 搭建生产者
首先添加maven依赖,这里我是用的是rabbitMQ,所以添加了rabbitMQ依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
接着在启动类上添加@EnableBinding(Source.class)
注解,这个注解给我们绑定消息通道的,Source是Stream给我们提供的
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Source.class)
public class ProviderApplicatioin {
public static void main(String[] args) {
SpringApplication.run(ProviderApplicatioin.class, args);
}
}
然后向消息代理发布消息,创建一个Java类,模拟发送
@Component
public class SimpleSourceBean {
private final Logger LOGGER = Logger.getLogger(SimpleSourceBean.class);
private Source source;
@Autowired
public SimpleSourceBean(Source source) {
this.source = source;
}
public void publish() {
Map<String, Object> map = Maps.newHashMap();
List<String> list = Lists.newArrayList();
list.add("test");
map.put("A", list);
map.put("B","123");
source
.output()
.send(MessageBuilder.withPayload(map).build());
}
}
这里代码较简单,使用source发送消息。
下一步配置application配置文件
spring:
application:
name: provider-server
cloud:
stream:
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称,在分析具体源代码的时候会进行说明
destination: publish # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
# binder: defaultRabbit # 设置要绑定的消息服务的具体设置
rabbitmq:
addresses: 192.168.33.10:5672
username: guest
password: guest
server:
port: 8081
这里也是一些配置,配置binding和rabbit
最后配置一个controller,调用发送接口。
@RestController
public class ProviderController {
private final Logger LOGGER = Logger.getLogger(ProviderController.class);
@Autowired
private SimpleSourceBean simpleSourceBean;
/**
* get方式接口
* @param request 请求参数
*/
@RequestMapping(value = "/provider", method = RequestMethod.GET)
public String provider(@RequestParam String request) {
LOGGER.info("========================================");
LOGGER.info("provider service 被调用 ");
LOGGER.info("========================================");
LOGGER.info("========================================");
simpleSourceBean.publish();
LOGGER.info("发送消息");
LOGGER.info("========================================");
return "provider, " + request;
}
}
直接一个get请求调用发送接口。
- 搭建消费者
首先也是添加maven依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
然后配置sink,接受消息
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class ConsumerApplication {
private final Logger LOGGER = Logger.getLogger(ConsumerApplication.class);
@StreamListener(Sink.INPUT)
public void sink(Map<String, Object> map) {
LOGGER.info("=======================");
LOGGER.info(map);
LOGGER.info("=======================");
}
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
这里每次从input通道接收到消息,都会调用@StreamListener
绑定的方法。
接着配置application配置文件
spring:
application:
name: consumer-server
cloud:
stream:
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称,在分析具体源代码的时候会进行说明
destination: publish # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
group: consumer-server
rabbitmq:
addresses: 192.168.33.10:5672
username: guest
password: guest
server:
port: 8111
配置完成后,分别启动服务。调用发送消息接口,就可以看到消费端打印的日志:
2018-11-26 21:25:56.997 INFO 42320 --- [nsumer-server-1] c.h.w.spring.cloud.ConsumerApplication : =======================
2018-11-26 21:25:56.997 INFO 42320 --- [nsumer-server-1] c.h.w.spring.cloud.ConsumerApplication : {A=[test], B=123}
2018-11-26 21:25:56.997 INFO 42320 --- [nsumer-server-1] c.h.w.spring.cloud.ConsumerApplication : =======================
这样使用spring cloud stream创建消息生成者和消费者就完成了。