RabbitMQ
存在的问题:比如客户端调用商品服务添加一条数据到数据库,数据库添加成功后商品服务还必须要等待搜索服务、缓存服务都完成后才能给客户端响应结果。RabbitMQ就是为了解决这种问题。商品服务完成添加数据到数据库之后可以将其余的消息交给RabbitMQ,然后商品服务就可以先向客户端响应结果,RabbitMQ再通知搜索服务和缓存服务添加数据。
一、RabbitMQ安装
用docker-compose安装即可
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672 # 这个端口是RabbitMQ自带的图形监控界面
volumes:
- ./mydata/rabbitmq:/var/lib/rabbitmq
二、RabbitMQ架构
2.1 架构图
由生产者发送消息到一个指定Exchange中,然后指定Exchange和某个Queue的路由关系,Exchange就会将消息发送到Queue中,消费者和Virtual Host建立连接,和Queue建立Channel后,就能拿到最新的消息。
注意:一个Queue中的消息只会被一个消费者消费一次(即消费完之后消息直接出队),另一个消费者消费不到同一个Queue中的同一条消息
2.2 查看图形化界面
直接访问http://ip:15672,默认用户/密码都是guest,是一个管理员用户。
在界面的Connections可以查看客户端与Virtual Host建立的连接;在Channels可以查看建立连接后建立的管道;在Exchange可以查看已有的Exchange,如果发送消息的时候没有指定Exchange,会默认用 “/”D的Exchange;在Queues可以查看队列,队列需要我们手动创建;在Admin,可以添加其他用户,可以添加Virtual Host,可以指定哪个用户可以管理哪几个Virtual Host。
三、RabbitMQ的使用
3.1 RabbitMQ的路由策略
- HelloWorld
- WorkQueues
- Publish/Subscribe
- Routing
- Topics
- Publisher confirms
3.2 Java连接RabbitMQ
Step1:创建Maven工程,导入RabbitMQ依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
Step2:创建一个用于建立Connection的工具类
public class RabbitMQUtils {
public static Connection getConnection {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.199.109"); // 指定ip
factory.setPort(5672); // 指定端口
factory.setUsername("test"); // 指定用户名密码
factory.setPassword("test");
factory.setVirtualHost("/test"); // 指定当前用户管理的Virtual Host
Connection connection = null;
connection = factory.newConnection();
return connection;
}
}
Step3:测试
@Test
public void testConnection() {
Connection connection = RabbitMQUtils.getConnection();
connection.close(); // 在这里打个断点, 就可以在图形化界面查看到建立的连接
}
3.3 HelloWorld路由
一个生产者,创建一个Channel,一个默认的Exchange(不用手动指定),一个Queue,一个消费者。
Step1:创建生产者,创建一个Channel,发布消息到Exchange,指定路由规则
public class Productor {
@Test
public void publish() {
// 1. 获取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 创建Channel
Channel channel = connection.createChannel();
// 3. 发布消息到Exchange, 同时指定路由规则
String msg = "一条消息";
/**
* 四个参数:
(1) 指定Exchange, 如果要使用默认的就给一个空串
(2) 指定路由规则, 使用具体的队列名称(当使用默认Exchange的时候, 这个参数同时也是队列名)
(当使用自定义Exchange的时候, 这个参数就写Exchange名)
(3) 指定传递消息所携带的属性, 没有给就null
(4) 指定发布的具体消息, byte[]类型
*/
channel.basicPublish("", "HelloWorld", null, msg.getBytes());
System.in.read();
channel.close();
connection.close();
}
}
Step2:创建消费者,创建一个Channel,创建一个Queue,并且去消费当前队列
public class Costumer {
@Test
public void consume() {
// 1. 获取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 创建Channel
Channel channel = connection.createChannel();
// 3. 声明队列
/**
* 五个参数:
(1) 指定队列名称
(2) 当前队列是否持久化(只有在Queue中才能持久化, Exchange中不行)
(3) 是否排外, 即当前队列是否只能被一个消费者消费
(4) 如果这个队列没有消费者在消费, 则队列自动删除
(5) 指定当前队列的其他信息
*/
channel.queueDeclare("HelloWorld", true, false, false, null);
// 4. 开启监听Queue
// 4.1 消息回调方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelopo,
AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println("接受到消息" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
}
}
// 4.2 开始监听
/**
* 三个参数:
(1) 指定消费哪个队列
(2) 指定是否自动ACK(即接收到消息以后会立即告诉RabbitMQ该消息已经被消费掉)
(3) 指定消息回调犯法(即消费者具体要做的事情)
*/
channel.basicConsume("HelloWorld", false, consumer);
System.in.read();
channel.close();
connection.close();
}
}
Step3:测试。注意:先启动消费者再启动生产者,不然无法发送消息到Exchange,因为发送消息到Exchange需要Queue
3.4 WorkQueues路由
一个生产者,一个默认的Exchange,一个Queue,多个消费者
Step1:创建生产者,同上
Step2:创建消费者,同上。不同的是要创建两个消费者
/**
* 二号消费者
*/
public class Costumer2 {
@Test
public void consume() {
// 1. 获取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 创建Channel
Channel channel = connection.createChannel();
// 3. 声明队列
channel.queueDeclare("WorkQueues", true, false, false, null);
// 3.1 可以指定当前消费者一次能消费多少条消息, 如果不指定就是两个消费者均摊
channel.basicQos(2);
// 4. 开启监听Queue
// 4.1 消息回调方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelopo,
AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println("消费者二号接受到消息" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
}
}
// 4.2 开始监听
channel.basicConsume("Work", false, consumer);
System.in.read();
channel.close();
connection.close();
}
}
3.5 Publish/Subscribe路由
一个生产者,一个自定义的Exchange,多个Queue,多个消费者。此时两个Queue中的消息是一模一样的。
Step1:创建生产者,需要手动创建一个Exchange
public class Productor {
@Test
public void publish() {
// 1. 获取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 创建Channel
Channel channel = connection.createChannel();
// 3. 创建Exchange
/**
* 参数1: 指定Exchange的名称; 参数2: 指定Exchange的类型
FANOUT - 对应Publish/Subscribe路由
DIRECT - 对应Routing路由
TOPIC - 对应Topics路由
*/
channel.exchangeDeclare("pubsub_exchange", BuiltinExchangeType.FANOUT);
// 4. 将自定义的Exchange和Queue绑定起来(也可以在消费者中绑定)
// 参数3是RouteKey, Publish/Subscribe路由的RouteKey给空串即可
channel.queueBind("pubsub_queue1", "pubsub_exchange", "");
channel.queueBind("pubsub_queue2", "pubsub_exchange", "");
// 5. 发布消息到Exchange, 同时指定路由规则
for (int i = 0; i < 10; i++) {
String msg = "一条消息" + i;
channel.basicPublish("pubsub_exchange", "Work", null, msg.getBytes());
}
System.in.read();
channel.close();
connection.close();
}
}
Step2:创建消费者,同时指定当前消费者消费的是哪个队列
/**
* 二号消费者
*/
public class Costumer2 {
@Test
public void consume() {
// 1. 获取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 创建Channel
Channel channel = connection.createChannel();
// 3. 声明队列
channel.queueDeclare("pubsub_queue2", true, false, false, null);
// 4. 开启监听Queue
// 4.1 消息回调方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelopo,
AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println("消费者二号接受到消息" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
}
}
// 4.2 开始监听
channel.basicConsume("pubsub_queue2", false, consumer);
System.in.read();
channel.close();
connection.close();
}
}
3.6 Routing路由
一个生产者,一自定义的Exchange,多个Queue,多个消费者,Exchange和Queue之间不是直接绑定,而是通过RouteKey绑定,就是指定哪些类型的消息发送到哪个Queue中,此时两个Queue中的消息不一定相同。
Step1:创建生产者,绑定时指定Queue对应的RouteKey
public class Productor {
@Test
public void publish() {
// 1. 获取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 创建Channel
Channel channel = connection.createChannel();
// 3. 创建Exchange
/**
* 参数1: 指定Exchange的名称; 参数2: 指定Exchange的类型
FANOUT - 对应Publish/Subscribe路由
DIRECT - 对应Routing路由
TOPIC - 对应Topics路由
*/
channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
// 4. 将自定义的Exchange和Queue绑定起来(也可以在消费者中绑定)
// 参数3指定每个队列对应的RouteKey
channel.queueBind("routing_queue1", "routing_exchange", "ERROR");
channel.queueBind("routing_queue2", "routing_exchange", "INFO");
// 5. 发布消息到Exchange, 同时指定RouteKey, Exchange会根据RouteKey把消息发送到对应的Queue
channel.basicPublish("pubsub_exchange", "ERROR", null, "error msg1".getBytes());
channel.basicPublish("pubsub_exchange", "INFO", null, "info msg1".getBytes());
channel.basicPublish("pubsub_exchange", "INFO", null, "info msg2".getBytes());
channel.basicPublish("pubsub_exchange", "INFO", null, "info msg3".getBytes());
System.in.read();
channel.close();
connection.close();
}
}
Step2:创建消费者,同3.5
3.7 Topics路由
同Routing路由方式,只是RouteKey的形式不同
Step1:创建生产者
public class Productor {
@Test
public void publish() {
// 1. 获取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 创建Channel
Channel channel = connection.createChannel();
// 3. 创建Exchange
/**
* 参数1: 指定Exchange的名称; 参数2: 指定Exchange的类型
FANOUT - 对应Publish/Subscribe路由
DIRECT - 对应Routing路由
TOPIC - 对应Topics路由
*/
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
// 4. 将自定义的Exchange和Queue绑定起来(也可以在消费者中绑定)
// 参数3指定每个队列对应的RouteKey
/**
* 关于此时的RouteKey:
* "*.red.*": 表示有三个条件, 只要中间的条件符合就可以将消息发送到该队列, *是占位符
* "fast.#": #是通配符, 效果等同于"fast.*.*"
*/
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
channel.queueBind("topics_queue2", "topics_exchange", "fast.#");
// 5. 发布消息到Exchange, 同时指定RouteKey, Exchange会根据RouteKey把消息发送到对应的Queue
channel.basicPublish("topics_exchange", "fast.red.monkey", null, "快红猴".getBytes());
channel.basicPublish("topics_exchange", "fast.white.dog", null, "快白狗".getBytes());
channel.basicPublish("topics_exchange", "slow.red.cat", null, "快红猴".getBytes());
// 此时第1条、第3条消息会发送给topics_queue1
// 第1条、第2条消息会发送给topics_queue2
System.in.read();
channel.close();
connection.close();
}
}
四、SpringBoot整合RabbitMQ
4.1 快速入门
Step1:创建SpringBoot工程,导入SpringBoot整合RabbitMQ的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Step2:编写配置
spring:
rabbtimq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
Step3:编写配置类,声明Exchange和Queue,并且绑定
@Configuration
public class RabbitMQConfig {
// 创建Topic路由的Exchange
@Bean("TOPIC_EXCHANGE")
public TopicExchange getTopicExchange() {
/**
* 参数1: 指定Exchange的名称
* 参数2: 是否持久化
* 参数3: 是否自动删除
*/
return new TopicExchange("boot_topic_exchange", true, false);
}
// 创建Queue1
@Bean("TOPIC_QUEUE1")
public Queue getQueue1() {
/**
* 参数1: 指定Queue名称
* 参数2: 是否持久化
* 参数3: 是否排外
* 参数4: 是否自动删除
* 参数5: 携带的参数
*/
return new Queue("boot_topic_queue1", true, false, false, null);
}
// 创建Queue2
@Bean("TOPIC_QUEUE2")
public Queue getQueue2() {
return new Queue("boot_topic_queue2", true, false, false, null);
}
// 绑定
@Bean
public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
@Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
// 指定Queue, 指定Exchange, 指定RouteKey
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
}
// 绑定
@Bean
public Binding getBindingSlow(@Qualifier("TOPIC_QUEUE2") Queue queue,
@Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
// 指定Queue, 指定Exchange, 指定RouteKey
return BindingBuilder.bind(queue).to(topicExchange).with("slow.*.*");
}
}
Step4:准备生产者,发送消息
public class Productor {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publish() {
/**
* 参数1: 指定发送到的Exchange名称
* 参数2: 指定此次发送的RouteKey
* 参数3: 具体消息
*/
rabbitTemplate.convertAndSend("boot_topic_exchange", "fast.red.dog", "快红狗");
rabbitTemplate.convertAndSend("boot_topic_exchange", "slow.white.pig", "慢白猪");
System.in.read();
}
}
Step5:准备消费者
@Component
public class Costumer {
// 指定当前消费者监听的队列, SpringBoot工程启动后就会一直监听
@RabbitListener(queues = "boot_topic_queue1")
public void consumer1(Object msg) {
// 该方法内是消费者接收到消息之后的具体动作
System.out.println("消费者1号接收到消息" + msg);
}
@RabbitListener(queues = "boot_topic_queue2")
public void consumer2(Object msg) {
System.out.println("消费者2号接收到消息" + msg);
}
}
Step6:测试。启动SpringBoot工程,可以在打印中查看消费者打印的消息,也可以在监控界面中查看消息的发送和被消费的情况
4.2 在SpringBoot中手动ACK
Step1:修改配置
spring:
rabbtimq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
ackonwledge-mode: manual # 手动ACK配置
Step2:修改消费者的接收行为
@RabbitListener(queues = "boot_topic_queue1")
public void consumer1(String msg, Channel channel, Message message) throws IOException{
System.out.println("消费者1号接收到消息" + msg);
// 手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
五、RabbitMQ的其他操作
5.1 消息的可靠性
-
Q:如果消息已经到达了RabbitMQ,但是RabbitMQ宕机了,消息会丢失吗?
A:不会,RabbitMQ的Queue有持久化机制
-
Q:消费者在消费消息时,如果执行到一半,消费者宕机了怎么办?
A:消费者中最好使用手动ACK来避免消息没消息完却宕机的情况,这样消息就还会存在队列中
-
Q:生产者发送消息时,由于网络问题,导致消息没发送到RabbitMQ怎么办?
A:RabbitMQ提供了事务操作和Confirm机制,可以保证生产者把消息发送到Exchange(但是事务操作效率太低,主要用Confirm机制)
5.1.1 RabbitMQ的Confirm机制
可以保证生产者把消息发送到Exchange
5.1.1.1 普通Confirm方式
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
// Step1:在消息发送之前, 开启Confirm
channel.confirmSelect();
channel.basicPublish("topics_exchange", "fast.red.monkey", null, "快红猴".getBytes());
// Step2:在消息发送之后确认消息是否发送成功
if (channel.waitForConfirms()) {
/**
* 消息发送成功时的处理
*/
} else {
/**
* 消息发送失败时的处理
*/
}
System.in.read();
channel.close();
connection.close();
}
}
5.1.1.2 批量Confirm方式
主要用于发送多条消息时,只要有一条发送失败,则全部发送失败,抛出异常。
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
// Step1:在消息发送之前, 开启Confirm
channel.confirmSelect();
for (int i = 0 ; i < 100; i++) {
String msg = "快红猴" + i;
channel.basicPublish("topics_exchange", "fast.red.monkey", null, msg.getBytes());
}
// Step2:在消息发送之后确认消息是否发送成功
channel.waitForConfirmsOrDie(); // 该方法不会返回布尔值, 如果有一条发送失败就会直接抛异常
System.in.read();
channel.close();
connection.close();
}
}
5.1.1.3 异步Confirm方式
因为是异步的,所以不管生产者的消息是否发送成功,都会接着执行生产者后面的代码,单独有会有一个线程出来进行Confirm判断。效率最高,最常用。
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
// Step1:在消息发送之前, 开启Confirm
channel.confirmSelect();
for (int i = 0 ; i < 100; i++) {
String msg = "快红猴" + i;
channel.basicPublish("topics_exchange", "fast.red.monkey", null, msg.getBytes());
}
// Step2:在消息发送之后确认消息是否发送成功
channel.addConfirmListener(new ConfirmListener {
// 消息发送成功时的回调方法
@Override
public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息发送成功。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
}
// 消息发送失败时的回调方法
@Override
public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息发送失败。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
}
});
System.in.read();
channel.close();
connection.close();
}
}
5.1.2 RabbitMQ的Return机制
Confirm机制只能保证消息到达Exchange,无法保证消息可以被Exchange分发到Queue。而Exchange是不能持久化消息的,Queue才可以持久化消息。所以可以使用Return机制保证Exchange把消息送达到指定的Queue
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
// Step1:在开启Confirm之前先开启Return
channel.addReturnListener(new ReturnListener() {
// 消息没有到达Queue时执行的回调方法
@Override
public void handleReturn(int replyCode, String exchange,
String routingKey, AMQP.BasicProperties properties,
byte[] msg) throws Exception {
System.out.println(new String(msg, "UTF-8") + "没有送到到Queue中");
}
})
// Step2:在消息发送之前, 开启Confirm
channel.confirmSelect();
for (int i = 0 ; i < 100; i++) {
String msg = "快红猴" + i;
// 在发送消息时, 在RouteKey参数后面追加一个参数, 置为true表示开启Return
channel.basicPublish("topics_exchange", "fast.red.monkey", true, null, msg.getBytes());
}
// Step3:在消息发送之后确认消息是否发送成功
channel.addConfirmListener(new ConfirmListener {
@Override
public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息发送成功。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
}
@Override
public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息发送失败。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
}
});
System.in.read();
channel.close();
connection.close();
}
}
5.1.3 在SpringBoot中实现Confirm和Return
Step1:修改配置文件
spring:
rabbtimq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
ackonwledge-mode: manual # 手动ACK配置
publisher-confirm-type: simple
publisher-returns: true
Step2:编写配置类,指定RabbitTemplate对象,开启Confirm和Return,并编写回调方法
@Component // 这里不能用Configuration, 因为要实现接口
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTempalte;
// 定义初始化方法, 这个方法在构建对象时会执行
@PostConstruct
public void initMethod() {
// 给RabbitTemplate指定Confirm和Return的回调方法
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
/**
* 消息发送成功时的处理
*/
} else {
/**
* 消息发送失败时的处理
*/
}
}
@Override
public void returnMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
/**
* Exchange中的消息没有送达到Queue时的处理
*/
}
}
Step3:生产者和消费者不变
5.2 消息重复消费
重复消费同一个Queue的消息,会对非幂等行操作(增删)造成问题;重复消费消息的原因是,消费者没有给RabbitMQ一个Ack。
为了解决重复消费消息的问题,可以采用Redis,在消费者消费消息之前,先将消息的id作为key放到Redis中(用setnx方法,key不存在就创建key,key存在则获取key的value),并把对应的value置0,表示正在执行任务,等任务执行完毕之后可以把value置1。如果消费者一号ACK失败,在RabbitMQ将消息交给消费者二号时,会先执行setnx方法判断key是否存在,如果key存在则获取key的value,如果value是0,则消费者二号就什么都不做,如果value是1,则表示消费者一号已经执行完了任务,但是ACK失败,消费者二号直接执行ACK帮消费者一号ACK即可。
极端情况:消费者一号出现了死锁,则会一直卡在key存在且value=0的情况。解决方法是:在setnx设置key的时候,给key指定上一个生存时间即可。
5.2.1 实现避免消息重复消费
Step1:在Docker中启动Redis,然后在项目中导入Redis的依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
Step2:生产者在发送消息前指定上消息的ID
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String exchange,
String routingKey, AMQP.BasicProperties properties,
byte[] msg) throws Exception {
System.out.println(new String(msg, "UTF-8") + "没有送到到Queue中");
}
})
channel.confirmSelect();
for (int i = 0 ; i < 100; i++) {
// 创建一个properties属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) // 指定消息是否需要持久化, 1表示需要持久化, 2不需要
.messageId(UUID.randomUUID().toString()) // 给每一条消息随机分配了一个UUID
.build();
String msg = "快红猴" + i;
// 将properties属性放到第4个参数就可以随着消息一起发出
channel.basicPublish("topics_exchange", "fast.red.monkey", true, properties, msg.getBytes());
}
channel.addConfirmListener(new ConfirmListener {
@Override
public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息发送成功。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
}
@Override
public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息发送失败。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
}
});
System.in.read();
channel.close();
connection.close();
}
}
Step3:消费者调用Redis进行操作
public class Costumer1 {
@Test
public void consume() {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueues", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelopo,
AMQP.BasicProperties properties, byte[] body) throws IOException{
Jedis jedis = new Jedis("192.168.199.109", 6379); // 连接Redis
String messageId = properties.getMessageId(); // 取出properties中的messageId
// 1. setnx到Redis中, 默认指定Value为0, 指定生存时间为10秒
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if (result != null && result.equalsIgnoreCase("OK")) {
System.out.println("消费者一号接受到消息" + new String(body, "UTF-8"));
// 2. 消费成功, set messageId的value为1
jedis.set(messageId, "1");
channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
} else {
// 3. 如果1中的setnx失败, 获取key对应的value
// 如果是0, 则直接return, 如果是1则先ACK再return
String value = jedis.get(messageId);
if ("1".equalsIgnoreCase(s)) {
channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
}
}
}
}
channel.basicConsume("topics_queue1", false, consumer);
System.in.read();
channel.close();
connection.close();
}
}
Step4:测试,启动生产者和消费者,然后到Redis中查看是否有新的key
5.2.2 在SpringBoot实现避免消息重复消费
Step1:导入SpringBoot整合Redis的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Step2:在配置文件中连接Redis
spring:
redis:
host: 192.168.199.109
port: 6379
Step3:生产者发送消息之前指定消息的ID
public class Productor {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publish() {
// 给消息的属性分配一个随机的UUID
CorrelationData messageId = new CorrelationData(UUID.randomUUID.toString());
// 把messageId放入第4个参数
rabbitTemplate.convertAndSend("boot_topic_exchange", "fast.red.dog", "快红狗", messageId);
System.in.read();
}
}
Step4:消费者调用Redis进行操作
@Component
public class Costumer {
// 用StringRedisTemplate可以直接用String存入Redis(可以不用转为字节数组)
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot_topic_queue1")
public void consumer1(String msg, Channel channel, Message message) throws IOException{
// 先取出messageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
// 1. 设置key到Redis
if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
System.out.println("消费者1号接收到消息" + msg);
// 2. 消费成功, set messageId的value为1
redisTemplate.opsForValue().set(messageId, "1");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动ACK
} else {
// 3. 如果1中的setnx失败, 获取key对应的value
// 如果是0, 则直接return, 如果是1则先ACK再return
if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动ACK
}
}
}
}
Step5:测试,启动生产者和消费者,然后到Redis中查看是否有新的key
六、RabbitMQ的简单应用
例子:添加一条数据时,客户模块在MySQL添后会调用搜索模块在ES中也添加
客户模块:
Step1:在客户模块的SpringBoot工程中导入SpringBoot整合RabbitMQ的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Step2:编写配置文件连接RabbitMQ
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
ackonwledge-mode: manual # 手动ACK配置
Step3:编写配置类
@Configuration
public class RabbitMQConfig {
// 创建Topic路由的Exchange
@Bean("TOPIC_EXCHANGE")
public TopicExchange getTopicExchange() {
return new TopicExchange("openapi_customer_exchange", true, false);
}
// 创建Queue1
@Bean("TOPIC_QUEUE1")
public Queue getQueue1() {
return new Queue("openapi_customer_queue", true, false, false, null);
}
// 绑定
@Bean
public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
@Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
// 指定Queue, 指定Exchange, 指定RouteKey
return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
}
}
Step4:在service中直接把消息发送给Exchange(生产者)(要先注入RabbitTemplate)
// 原来的写法: 直接把消息发送到
/**
String json = JSON.toJSON(customer);
HttpHeaders headers = new HttpHeaders();
headers.setContenType(MediaType.parseMediaType("application/json;charset=utf-8"));
HttpEntity<String> entity = new HttpEntity<>(json.headers);
restTemplate.postForObject("http://localhost:8080/search/customer/add", entity, String.class);
*/
// 新的写法: 将消息发送到Exchange
rabbitTemplate.converAndSend("openapi_customer_exchange", "openapi.customer.add", JSON.toJSON(customer));
搜索模块:
Step1:在客户模块的SpringBoot工程中导入SpringBoot整合RabbitMQ的依赖
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Step2:编写配置文件,连接RabbitMQ和Redis
spring:
rabbtimq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
ackonwledge-mode: manual # 手动ACK配置
Step3:编写配置类
@Configuration
public class RabbitMQConfig {
// 创建Topic路由的Exchange
@Bean("TOPIC_EXCHANGE")
public TopicExchange getTopicExchange() {
return new TopicExchange("openapi_customer_exchange", true, false);
}
// 创建Queue1
@Bean("TOPIC_QUEUE1")
public Queue getQueue1() {
return new Queue("openapi_customer_queue", true, false, false, null);
}
// 绑定
@Bean
public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
@Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
// 指定Queue, 指定Exchange, 指定RouteKey
return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
}
}
Step4:准备消费者
@Component
public class CostumerListener {
@Autowired
private CustomerService customerService;
// 指定当前消费者监听的队列, SpringBoot工程启动后就会一直监听
@RabbitListener(queues = "openapi_customer_queue")
public void consumer1(String msg, Channel channel, Message message) throws IOException{
// 1. 获取RoutingKey
String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
// 2. 根据RoutingKey来选择调用的方法
switch (receivedRoutingKey) {
case "openapi.customer.add":
// 3. 调用Service完成添加数据到ES(先把json字符串转为Customer对象再存)
customerService.saveCustomer(JSON.parseJSON(msg, Customer.class));
// 4. 手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
break;
/**
* ......
* 其他RoutingKey对应的处理
*/
}
}
}