本篇主要介绍一下spring boot 整合 rabbit mq 的使用。
项目介绍
本篇文章的例子分别写在两个项目中:
- spring-boot-rabbitmq-producer:存放消息生产端 producer 相关类,也就是消息发送端;
- spring-boot-rabbitmq-consumer:存放消息消费者 consumer 相关类,也就是消息接收端;
项目已经上传到github上:https://github.com/xsg1995/spring-boot-rabbitmq
pom.xml
pom文件引入依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xsg</groupId>
<artifactId>spring-boot-rabbitmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-rabbitmq-producer</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--spring boot test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
配置rabbit mq相关信息
spring:
rabbitmq:
host: localhost #rabbitmq服务地址
port: 5672 #rabbitmq通信端口
username: guest #rabbitmq用户名
password: guest #rabbitmq密码
以fanout分发策略分发消息的使用
- fanout:当指定fanout分发策略时,交换机不会处理路由key,交换机会将消息发送到所有绑定了在该交换机的队列上。如下图所示:
以下代码实现功能流程如下:
- 声明一个名为 AFanoutExchange 的 FanoutExChange 类型的交换机;
- 声明一个名称为 AFanoutQueue 的 Queue 队列,并绑定到 AFanoutExchange 交换机;
- 生产者发送消息到 AFanoutExchange 交换机,AFanoutExchange 交换机将消息路由发送与其绑定的 Queue 上面;
- 消费端从 Queue 上面拉取到消息进行消费。
producer生产消息端配置exchange
在 RabbitMQExchangeConfig 类中配置exchange信息:
/**
* 以Fanout方式发送消息
* 定义一个Exchange交换机,发送的消息将通过该交换机转发
* @return
*/
@Bean
public FanoutExchange AFanoutExchange() {
//传入exchange交换机的名称 AFanoutExchange
return new FanoutExchange(RabbitMQExchangeConstant.A_FANOUT_EXCHANGE);
}
在 RabbitMQExchangeConstant 中配置exchange 名称:
/**
* 以fanout方法发送A信息的Exchange名称
*/
public static final String A_FANOUT_EXCHANGE = "AFanoutExchange";
producer生产端配置发送消息类
在 ASender 类中主要定义消息发送的逻辑,也就是消息发送者:
/**
* 用于发送A消息的sender
*/
@Component
public class ASender {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 发送消息到 AFanoutExchange 交换机
*/
public void sendToAFanoutExchange(String msg) {
//要发送的信息拼上当前时间戳
String content = msg + "\t" + DateUtils.getDateTime();
//第一个参数表示Exchange交换机的名称
//第二个参数表示路由Key,Fanout方式路由消息不会处理路由key
//第三个参数为要发送的消息
this.rabbitTemplate.convertAndSend(
RabbitMQExchangeConstant.A_FANOUT_EXCHANGE,
"",
content);
}
}
comsumer消费端配置exchange
在 RabbitMQExchangeConfig 类中配置exchange:
/**
* 以Fanout方式发送消息
* 定义一个Exchange交换机,从该交换机接收消息
* @return
*/
@Bean
public FanoutExchange AFanoutExchange() {
//传入exchange交换机的名称 AFanoutExchange
return new FanoutExchange(RabbitMQExchangeConstant.A_FANOUT_EXCHANGE);
}
RabbitMQExchangeConstant 中配置 exchange 的名称:
/**
* 以fanout方法接收A信息的Exchange名称
*/
public static final String A_FANOUT_EXCHANGE = "AFanoutExchange";
consumer消费端配置Queue
在RabbitMQQueueConfig 配置 queue:
/**
* 创建绑定到 AFanoutExchange 交换机的队列
* @return
*/
@Bean
public Queue AFanoutQueue() {
//传入队列名称
return new Queue(RabbitMQQueueConstant.A_FANOUT_QUEUE);
}
在 RabbitMQQueueConstant 中指定队列名称:
/**
* 指定绑定到 AFanoutExchange 交换机的队列名称,用于接收 A 类型的信息
*/
public static final String A_FANOUT_QUEUE = "AFanoutQueue";
consumer消费端配置bind信息
在 RabbitMQBindConfig 中配置bind信息:
/**
* 将 AFanoutQueue 队列绑定到 AFanoutExchange 交换机上
* 用 AFanoutQueue 队列接收 AFanoutExchange 发送过来的消息
* @param AFanoutQueue
* @param AFanoutExchange
* @return
*/
@Bean
public Binding bindAFanoutExchangeToAFanoutQueue(Queue AFanoutQueue, FanoutExchange AFanoutExchange) {
return BindingBuilder.bind(AFanoutQueue).to(AFanoutExchange);
}
consumer消费端接收消息类
在 AFanoutConsumer 类中主要定义接收消息逻辑,也就是消息消费者:
/**
* 用于接收A消息的消费者consumer
*/
@Component
//表示监听名称为 AFanoutQueue 的消息队列
@RabbitListener(queues = {RabbitMQQueueConstant.A_FANOUT_QUEUE})
public class AFanoutConsumer {
/**
* 定义接收消息处理逻辑
* @param content
*/
@RabbitHandler
public void handler(String content) {
System.out.println("AFanoutConsumer 接收到消息: " + content);
}
}
运行测试:
首先运行 consumer 端也就是 spring-boot-rabbitmq-consumer 项目中的启动类启动消费者服务,然后运行 producer 端也就是 spring-boot-rabbitmq-producer 项目中的 ASenderTest 类,用来发送消息,内容如下:
/**
* ASender的测试类
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ASenderTest {
@Autowired
private ASender aSender;
/**
* 以 Fanout 方式发送消息测试用例
*/
@Test
public void sendToAFanoutExchangeTest() {
String msg = "Hello, I am A msg. — sendToAFanoutExchange";
this.aSender.sendToAFanoutExchange(msg);
}
}
运行结果,consumer接收到消息如下:
可以定义多个队列绑定到同一个exchange,如果以fanout方式转发消息,那么监听对应队列的多个消费端都会收到消息,如下所示:
以direct分发策略分发消息的使用
-
direct:当指定direct分发策略时,如果消息的路由key与队列绑定的路由key相同时,交换器就会将消息发送到该队列中。例如发送消息是指定路由key为 rk1 ,那么如果队列绑定的路由key也是 rk1,那么交换机会将消息发送到该队列;
以下代码实现功能流程如下:
- 声明一个名为 ADirectExchange 的 DirectExchange类型的交换机;
- 声明一个名称为 ADirectQueue的 Queue 队列,并绑定到 ADirectExchange 交换机上,指定交换机与队列之间的路由key为 A.rk1;
- 生产者发送消息到 ADirectExchange 交换机并指定发送的路由key为 A.rk1 ,ADirectExchange 交换机将消息路由发送与其绑定的 Queue 上面;
- 消费端从 Queue 上面拉取到消息进行消费。
producer生产消息端配置exchange
在 RabbitMQExchangeConfig 类中配置exchange信息:
/**
* 以direct方式发送消息
* 定义一个Exchange交换机,发送的消息将通过该交换机转发
* @return
*/
@Bean
public DirectExchange ADirectExchange() {
//传入exchange交换机的名称 ADirectExchange
return new DirectExchange(RabbitMQExchangeConstant.A_DIRECT_EXCHANGE);
}
在 RabbitMQExchangeConstant 中配置exchange 名称:
/**
* 以direct方法发送A信息的Exchange名称
*/
public static final String A_DIRECT_EXCHANGE = "ADirectExchange";
producer生产端编写发送消息逻辑
在 ASender 类中编写消息发送的逻辑:
/**
* 发送消息到 ADirectExchange 交换机
*/
public void sendToADirectExchange(String msg, String routeKey) {
//要发送的信息拼上当前时间戳
String content = msg + "\t" + DateUtils.getDateTime();
//第一个参数表示Exchange交换机的名称
//第二个参数表示路由Key,direct方式路由消息时,会将消息发送到绑定该路由key的队列上
//第三个参数为要发送的消息
this.rabbitTemplate.convertAndSend(
RabbitMQExchangeConstant.A_FANOUT_EXCHANGE,
routeKey,
content);
}
comsumer消费端配置exchange
在 RabbitMQExchangeConfig 类中配置exchange:
/**
* 以direct方式发送消息
* 定义一个Exchange交换机,从该交换机接收消息
* @return
*/
@Bean
public DirectExchange ADirectExchange() {
//传入exchange交换机的名称 ADirectExchange
return new DirectExchange(RabbitMQExchangeConstant.A_DIRECT_EXCHANGE);
}
RabbitMQExchangeConstant 中配置 exchange 的名称:
/**
* 以direct方法发送A信息的Exchange名称
*/
public static final String A_DIRECT_EXCHANGE = "ADirectExchange";
consumer消费端配置Queue
在RabbitMQQueueConfig 配置 queue:
/**
* 创建绑定到 ADirectExchange 交换机的队列
* @return
*/
@Bean
public Queue ADirectQueue() {
//传入队列名称
return new Queue(RabbitMQQueueConstant.A_DIRECT_QUEUE);
}
在 RabbitMQQueueConstant 中指定队列名称:
/**
* 指定绑定到 ADirectExchange 交换机的队列名称,用于接收 A 类型的信息
*/
public static final String A_DIRECT_QUEUE = "ADirectQueue";
consumer消费端配置bind信息
在 RabbitMQBindConfig 中配置bind信息:
/**
* 将 ADirectQueue 队列绑定到 ADirectExchange交换机上
* 用 ADirectQueue 队列接收 ADirectExchange 交换机发送过来的消息
* 指定路由key 为 A.rk1
* @param ADirectQueue
* @param ADirectExchange
* @return
*/
@Bean
public Binding bindADirectExchangeToADirectQueue(Queue ADirectQueue, DirectExchange ADirectExchange) {
//以 direct 方式接收消息需要指定路由key,也就是with传入的参数
return BindingBuilder.bind(ADirectQueue).to(ADirectExchange).with(RabbitMQRoutKeyConstant.A_RK1);
}
在 RabbitMQRoutKeyConstant 中配置路由key的名称:
/**
* 指定路由key 为 A.rk1, 表示接收路由key为 A.rk1 的消息
*/
public static final String A_RK1 = "A.rk1";
consumer消费端接收消息类
在 ADirectConsumer 类中主要定义接收消息逻辑,也就是消息消费者:
/**
* 用于接收A消息的消费者consumer
*/
@Component
//表示监听名称为 ADirectQueue 的消息队列
@RabbitListener(queues = {RabbitMQQueueConstant.A_DIRECT_QUEUE})
public class ADirectConsumer {
/**
* 定义接收消息处理逻辑
* @param content
*/
@RabbitHandler
public void handler(String content) {
System.out.println("ADirectConsumer 接收到消息: " + content);
}
}
运行测试:
在 ASenderTest 中编写测试用例:
/**
* 以 Direct 方式发送消息测试用例
*/
@Test
public void sendToADirectExchangeTest() {
String msg = "Hello, I am A msg. — sendToADirectExchange ";
//第一个参数为送的消息
//第二个参数为发送消息的路由key
this.aSender.sendToADirectExchange(msg, "A.rk1");
}
启动 consumer 端也就是 spring-boot-rabbitmq-consumer 项目中的启动类启动消费者服务,然后运行 producer 端也就是 spring-boot-rabbitmq-producer 项目中的 ASenderTest 类的 sendToADirectExchangeTest 方法,用来发送消息,运行结果如下图显示:
拷贝 ADirectConsumer 类命名为 ADirectQueue2,也是接收 ADirectQueue 队列的消息,内容如下:
/**
* 用于接收A消息的消费者consumer
*/
@Component
//表示监听名称为 ADirectQueue 的消息队列
@RabbitListener(queues = {RabbitMQQueueConstant.A_DIRECT_QUEUE})
public class ADirectConsumer2 {
/**
* 定义接收消息处理逻辑
* @param content
*/
@RabbitHandler
public void handler(String content) {
System.out.println("ADirectConsumer2 接收到消息: " + content);
}
}
修改测试用例方法为发送10条消息:
/**
* 以 Direct 方式发送消息测试用例
*/
@Test
public void sendToADirectExchangeTest() {
for (int i = 0; i < 10; i++) {
String msg = "Hello, I am A msg. — sendToADirectExchange ";
//第一个参数为送的消息
//第二个参数为发送消息的路由key
this.aSender.sendToADirectExchange(msg, "A.rk1");
}
}
运行结果如下:
从上面结果可以看出,如果以 direct 方式分发数据,并且有多个消费者同时消费同一个 queue 中的数据,那么那么消息发送会以轮询的发送平均的发送到多个消费者端。
以topic分发策略发送消息的使用
- topic:当指定topic分发策略时,交换器会通过模式匹配分发消息,如果路由key与某个模式匹配时,交换机就会将消息发送到与该模式匹配的队列中。例如某个队列 queue 绑定的路由key的模式为 a.# ,当 publisher 发送消息时,如果指定发送的路由key为 a.b 或者是 a.c 时,该队列将会收到路由器发送的消息。
以下代码实现功能流程如下:
- 声明一个名为 ATopicExchange的 TopicExchange 类型的交换机;
- 声明一个名称为 A_TopicQueue 的 Queue 队列,并绑定到 ADirectExchange 交换机上,指定交换机与队列之间的路由key的匹配模式为 A.#;
- 生产者发送消息到 ATopicExchange 交换机并指定发送的路由key为 A.b ,ATopicExchange 交换机将消息路由发送与其绑定的 Queue 上面;
- 消费端从 Queue 上面拉取到消息进行消费。
producer生产消息端配置exchange
在 RabbitMQExchangeConfig 类中配置exchange信息:
/**
* 以topic方式发送消息
* 定义一个Exchange交换机,发送的消息将通过该交换机转发
* @return
*/
@Bean
public TopicExchange ATopicExchange() {
//传入exchange交换机的名称 ATopicExchange
return new TopicExchange(RabbitMQExchangeConstant.A_TOPIC_EXCHANGE);
}
在 RabbitMQExchangeConstant 中配置exchange 名称:
/**
* 以topic方法发送A信息的Exchange名称
*/
public static final String A_TOPIC_EXCHANGE = "ATopicExchange";
producer生产端编写发送消息逻辑
在 ASender 类中编写消息发送的逻辑:
/**
* 发送消息到 ATopicExchange 交换机
*/
public void sendToATopicExchange(String msg, String routeKey) {
//要发送的信息拼上当前时间戳
String content = msg + "\t" + DateUtils.getDateTime();
//第一个参数表示Exchange交换机的名称
//第二个参数表示路由Key,topic方式路由消息时,会将消息发送到匹配该路由key的队列上
//第三个参数为要发送的消息
this.rabbitTemplate.convertAndSend(
RabbitMQExchangeConstant.A_TOPIC_EXCHANGE,
routeKey,
content);
}
comsumer消费端配置exchange
在 RabbitMQExchangeConfig 类中配置exchange:
/**
* 以topic方式发送消息
* 定义一个Exchange交换机,发送的消息将通过该交换机转发
* @return
*/
@Bean
public TopicExchange ATopicExchange() {
//传入exchange交换机的名称 ATopicExchange
return new TopicExchange(RabbitMQExchangeConstant.A_TOPIC_EXCHANGE);
}
RabbitMQExchangeConstant 中配置 exchange 的名称:
/**
* 以topic方法发送A信息的Exchange名称
*/
public static final String A_TOPIC_EXCHANGE = "ATopicExchange";
consumer消费端配置Queue
在RabbitMQQueueConfig 配置 queue:
/**
* 创建绑定到 ATopicExchange 交换机的队列
*
* @return
*/
@Bean
public Queue ATopicQueue() {
//传入队列名称
return new Queue(RabbitMQQueueConstant.A_TOPIC_QUEUE);
}
在 RabbitMQQueueConstant 中指定队列名称:
/**
* 指定绑定到 ATopicExchange 交换机的队列名称,用于接收 A 类型的信息
*/
public static final String A_TOPIC_QUEUE = "A_TopicQueue";
consumer消费端配置bind信息
在 RabbitMQBindConfig 中配置bind信息:
/**
* 将 ATopicQueue 队列绑定到 ATopicExchange
* 用 ATopicQueue 队列接收 ATopicExchange 交换机发送过来的消息
* 指定路由key 为 A.#
* @param ATopicQueue
* @param ATopicExchange
* @return
*/
@Bean
public Binding bindATopicExchangeToA_BTopicQueue(Queue ATopicQueue, TopicExchange ATopicExchange) {
//以 direct 方式接收消息需要指定路由key,也就是with传入的参数
return BindingBuilder.bind(ATopicQueue).to(ATopicExchange).with(RabbitMQRoutKeyConstant.A_ALL);
}
在 RabbitMQRoutKeyConstant 中配置路由key的名称:
/**
* 指定路由key 为 A.#, 表示接收路由key为 A. 开头的消息
*/
public static final String A_ALL = "A.#";
consumer消费端接收消息类
在 ATopicConsumer 类中主要定义接收消息逻辑,接收 A_BTopicQueue 队列中的消息:
/**
* 用于接收A消息的消费者consumer
*/
@Component
//表示监听名称为 A_TopicQueue 的消息队列
@RabbitListener(queues = {RabbitMQQueueConstant.A_TOPIC_QUEUE})
public class ATopicConsumer {
/**
* 定义接收消息处理逻辑
* @param content
*/
@RabbitHandler
public void handler(String content) {
System.out.println("ATopicConsumer 接收到消息: " + content);
}
}
运行测试:
在 ASenderTest 中编写测试用例:
/**
* 以 Topic 方式发送消息测试用例
*/
@Test
public void sendToATopicExchangeTest() {
String msg = "Hello, I am A.b msg. — sendToATopicExchange ";
//第一个参数为送的消息
//第二个参数为发送消息的路由key
this.aSender.sendToATopicExchange(msg, "A.b");
}
启动 consumer 端也就是 spring-boot-rabbitmq-consumer 项目中的启动类启动消费者服务,然后运行 producer 端也就是 spring-boot-rabbitmq-producer 项目中的 ASenderTest 类的 sendToATopicExchangeTest方法,用来发送消息,运行结果如下图显示:
至此,以 fanout、direct、topic 方式发送与消费消息的例子都已经介绍完毕。
关于 Exchange、Queue 参数详解可以参考:Spring boot集成RabbitMQ中Exchange与Queue参数详解