之前的文章我们已经介绍了 RabbitMQ 的基本使用,但是在默认情况下 RabbitMQ 并不能保证消息是否发送成功、以及是否被成功消费掉。消息在传递过程中存在丢失的可能。基于这样的现状,就有了消息的确认机制,来提高消息传递过程中的可靠性。
RabbitMQ 中,消息的确认机制包含以下两个方面:
-
消息发送确认,生产者发送消息的确认包含两部分:
1、生产者发送的消息是否成功到达交换机
2、消息是否成功的从交换机投放到目标队列 -
消息接收确认,消费者接收消息有三种不同的确认模式:
1、AcknowledgeMode.NONE:不确认,这是默认的模式,默认所有消息都被成功消费了,直接从队列删除消息。存在消息被消费过程中由于异常未被成功消费而掉丢失的风险。
2、AcknowledgeMode.AUTO:自动确认,根据消息被消费过程中是否发生异常来发送确认收到消息
或拒绝消息
的指令到 RabbitMQ 服务。这个确认时机开发人员是不可控的,同样存在消息丢失的风险。
3、AcknowledgeMode.MANUAL:手动确认,开发人员可以根据实际的业务,在合适的时机手动发送确认收到消息
或拒绝消息
指令到 RabbitMQ 服务,整个过程开发人是可控的。这种模式也是我们要重点介绍的。
一、准备环境
创建 SpringBoot 项目,添加 RabbitMQ 依赖。
这里将生产者和消费者放在一个项目。
在application.properties
中添加连接 RabbitMQ 服务的配置,以及开启消息确认机制需要的配置:
server.port=8080
# rabbitmq 相关配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
# 开启消息是否已经发送到交换机的确认机制
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消息未成功投递到目标队列时将消息返回
spring.rabbitmq.publisher-returns=true
# 设置消费者需要手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
创建交换机、队列,并完成绑定:
@Configuration
public class AckRabbitMQConfig {
// Fanout交换机
@Bean
FanoutExchange ackExchange() {
return new FanoutExchange("ack.exchange", true, false);
}
// 消息队列
@Bean
Queue ackQueue() {
return new Queue("ack.queue", true);
}
// 绑定队列和交换机
@Bean
Binding ackBinding() {
return BindingBuilder.bind(ackQueue()).to(ackExchange());
}
}
二、消息发送确认
消息发送确认的第一部分,是确认消息是否已经成功发送到交换机,我们需要实现RabbitTemplate.ConfirmCallback
接口:
@Service
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
* @param correlationData
* @param ack true 表示消息成功发送到交换机,false 则发送失败
* @param cause 消息发送失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息已经发送到交换机!");
} else {
System.out.println("消息发送到交换机失败:" + cause);
}
}
}
消息无论是否成功到达交换机都会调用confirm
方法。
消息发送确认的第二部分,就是消息是否成功的从交换机投放到目标队列,需要实现RabbitTemplate.ReturnsCallback
接口:
@Service
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("未成功投递到队列的消息:"+ returned.toString());
}
}
returnedMessage
方法只会在消息未成功投递到目标队列时被调用
,ReturnedMessage
就是投递失败的消息基本信息。
定义好了两种消息发送确认服务,接下来就是配置消息发送确认服务,可以放在 RabbitMQ 配置类里进行全局配置:
@Configuration
public class AckRabbitMQConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
ConfirmCallbackService confirmCallbackService;
@Autowired
ReturnCallbackService returnCallbackService;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnsCallback(returnCallbackService);
}
......
......
}
也可以在发送消息时单独配置:
@Service
public class SendMessageService {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
ConfirmCallbackService confirmCallbackService;
@Autowired
ReturnCallbackService returnCallbackService;
public void send(String message) {
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnsCallback(returnCallbackService);
rabbitTemplate.convertAndSend("ack.exchange", "", message);
System.out.println("生产者发送的消息:" + message);
}
}
三、消息接收确认
消息接收确认的实现就相对简单一些:
@Service
public class ReceiveMessageService {
@RabbitListener(queues = "ack.queue")
public void receive(String msg, Channel channel, Message message) {
try {
// int i = 1/0;
// 确认收到消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("消费者确认收到消息:" + msg);
} catch (Exception e) {
try {
// 拒绝消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
System.out.println("消费者拒绝消息:" + msg);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
使用消息接收的手动确认模式时,接收消息的方法需要额外添加Channel
、Message
两个类型的参数。
Channel
就是信道,在学习 Java client 操作 RabbitMQ 时,就是用它来发送接收消息的,不了解的可以复习一下。Message
是 RabbitMQ 封装的消息类,里边包含了消息体、消息序号、以及交换机、队列等一些相关的信息。
这样我们就可以根据实际的业务需求,在适当的时机告诉 RabbitMQ 服务,消息已经成功消费,或者被拒绝消费。
这就涉及如下几个方法了:
-
basicAck,确认收到消息,即消息消费成功,执行该方法后,消息会被从队列删除。该方法的参数含义如下:
1、deliveryTag
:消息投递的序号,就是1、2、3、4这样的递增整数。
2、multiple
:是否批量确认消息,false 表示只确认当前 deliveryTag 对应的消息,true 表示会确认小于当前 deliveryTag 但还未被确认的消息。 -
basicNack,拒绝消息,由于发生异常等原因,消息没有被成功消费。和 basicAck 方法相比多了一个参数:
1、requeue
:true 表示被拒绝的消息会重新进入队列头部。 - basicReject,和 basicNack 方法的作用类似,但是少了 multiple 参数。
这里有两个问题需要注意:
1、
如果拒绝消息时,设置requeue
为true
,由于消息会重新进入队列头部,接下来又会被消费者处理,这样很可能陷入死循环,耗尽服务器资源,很危险的。所以在设置requeue
为true
时,需要慎重考虑。
拒绝消息时一般都是由于发生异常、或者业务上的错误,导致消费流程不能正常进行下去,可以考虑将此时的消息发送到死信队列,后续再单独处理。具体怎么实现,后期会有专门的文章介绍,目前先了解即可。
2、
如果开启了消息接收的手动确认模式,但是消费消息时却没有做任何消息确认成功或拒绝的应答操作,则对应的消息会变成Unacked
状态:
如果消费者客户端不重启,则Unacked
状态的消息会一直堆积,不会被删除,也不会被重新消费。
如果消费者客户端重启,则消息会自动变为Ready
状态,这样又会被重新消费一次。
三、效果测试
可以通过如下接口来发送消息:
@RestController
public class SendMessageController {
@Autowired
private SendMessageService sendMessageService;
@GetMapping("/send/{msg}")
public void send(@PathVariable("msg") String msg) {
sendMessageService.send(msg);
}
}
要测试消息不能成功发送到交换机的情况,只需要发送消息时指定一个不存在的交换机即可。
由于RabbitTemplate.ReturnsCallback
的returnedMessage
方法只会在消息未成功投递到目标队列时被调用,所以要测试消息是否成功的从交换机投放到目标队列,可以注释掉AckRabbitMQConfig
中交换机和队列绑定的代码,或者在后台进行交换机和队列的解绑:
这样消息自然不能成功的从交换机投放到队列。
至于消息接收确认,可以自行模拟不同的业务场景测试。
本文完!