问题描述
在订单系统,当用户下单后需要在10分钟内完成支付,否则取消订单。
解决方案
- 如果我们使用定时任务来做,那这个失效时间对不准确,当时可以提高定时任务的执行频率来减小这个误差。
- 使用延迟队列,我们这里主要将这种方式。
基本概念
所谓的‘延迟队列“就是消息被发送以后,不直接被消费者消费,而是等到特定时间后消费者才能拿到消息消费。
延迟队列模型
延迟队列模型.png
详细说明
RabbitMQ本身不支持延迟队列,但是我们可以使用死信队列(DLX)和设置有效时间(TTL)两个特性来实现延迟队列。
先新建队列order_query并设置消息有效时间是10分钟,然后绑定一个死信队列order_dead_query,消费者消费order_dead_query队列的消息。生成订单的时候往队列order_query发一条消息,当10分钟后这条消息会进入死信队列order_dead_query里面并被我们消费者消费,这时我们去查询一下该订单的支付状态,如果是已支付不做任何操作,如果是未支付就取消订单。消息的发送端参考Spring Boot RabbitMQ实践 。
声明队列 RabbitConfig
/**
* RabbitMQ 配置类
*
* @author yuhao.wang
*/
@Configuration
public class RabbitConfig {
/**
* 方法rabbitAdmin的功能描述:动态声明queue、exchange、routing
*
* @param connectionFactory
* @return
* @author : yuhao.wang
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//声明死信队列(Fanout类型的exchange)
Queue deadQueue = new Queue(RabbitConstants.QUEUE_NAME_DEAD_QUEUE);
// 死信队列交换机
FanoutExchange deadExchange = new FanoutExchange(RabbitConstants.MQ_EXCHANGE_DEAD_QUEUE);
rabbitAdmin.declareQueue(deadQueue);
rabbitAdmin.declareExchange(deadExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(deadQueue).to(deadExchange));
// 发放奖励队列交换机
DirectExchange exchange = new DirectExchange(RabbitConstants.MQ_EXCHANGE_SEND_AWARD);
//声明发送优惠券的消息队列(Direct类型的exchange)
Queue couponQueue = queue(RabbitConstants.QUEUE_NAME_SEND_COUPON);
rabbitAdmin.declareQueue(couponQueue);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(couponQueue).to(exchange).with(RabbitConstants.MQ_ROUTING_KEY_SEND_COUPON));
return rabbitAdmin;
}
public Queue queue(String name) {
Map<String, Object> args = new HashMap<>();
// 设置死信队列
args.put("x-dead-letter-exchange", RabbitConstants.MQ_EXCHANGE_DEAD_QUEUE);
args.put("x-dead-letter-routing-key", RabbitConstants.MQ_ROUTING_KEY_DEAD_QUEUE);
// 设置消息的过期时间, 单位是毫秒
args.put("x-message-ttl", 5000);
// 是否持久化
boolean durable = true;
// 仅创建者可以使用的私有队列,断开后自动删除
boolean exclusive = false;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = false;
return new Queue(name, durable, exclusive, autoDelete, args);
}
}
设置消息的过期时间, 单位是毫秒
args.put("x-message-ttl", 5000);
消费者消费死信队列 DeadMessageListener
/**
* 延迟队列消费
*
* @author yuhao.wang
*/
@Service
public class DeadMessageListener {
private final Logger logger = LoggerFactory.getLogger(DeadMessageListener.class);
@RabbitListener(queues = RabbitConstants.QUEUE_NAME_DEAD_QUEUE)
public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception {
logger.info("[{}]处理延迟队列消息队列接收数据,消息体:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON, JSON.toJSONString(sendMessage));
System.out.println(message.getMessageProperties().getDeliveryTag());
try {
// 参数校验
Assert.notNull(sendMessage, "sendMessage 消息体不能为NULL");
// TODO 处理消息
// 确认消息已经消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
logger.error("MQ消息处理异常,消息体:{}", message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage), e);
try {
// TODO 保存消息到数据库
// 确认消息已经消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception dbe) {
logger.error("保存异常MQ消息到数据库异常,放到死性队列,消息体:{}", JSON.toJSONString(sendMessage), dbe);
// 确认消息将消息放到死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
}
消费端直接监听死信队列,达到延迟消费消息的效果
源码
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-rabbitmq 工程