Spring Cloud Stream 进阶配置——使用延迟队列实现“定时关闭超时未支付订单”

ps: 本文所有代码可在 这里 查看。

延迟队列

延迟队列 操作的对象是延迟消息,所谓 “延迟消息” 是指当消息被发送以后,并不想让消费者立刻消费消息,而是等待特定时间后,消费者才能拿到消息进行消费。

延迟队列比较经典的使用场景有:

  1. 在订单系统中,用户下单后,如果未在规定时间内(比如30分钟)支付,那么该订单会被关闭,即自动取消订单。
  2. 用户希望通过手机远程控制家里的智能设备在指定的时间进行工作。这时候可以将用户指令发送到延迟队列,当指令时间到了,再将指令推送到智能设备。

基于 RabbitMQ 的延迟队列

使用死信队列实现延迟队列

AMQP 协议中,或者 v3.5.8 之前的 RabbitMQ 本身并没有直接支持延迟队列功能,要想实现类似延迟队列的功能,可以通过死信队列的配合。即定义一组 ttl 为特定时长的队列,比如:5秒,10秒,30秒,1分钟等,然后再对这些队列,分别定义死信队列,当消息过期时,就会转存到相应的死信队列(即延迟队列)中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费。

使用延迟消息交换机插件实现延迟队列

上面介绍的延迟队列实现方式,其实是比较繁琐的,好在,在版本 v3.5.8之后,RabbitMQ 推出了一个延迟消息交换机插件:rabbitmq_delayed_message_exchange,当启用该插件后,如果有一个队列声明为延迟交换机,那么当有消息发送到该交换机后,会根据延迟时长来决定投递的顺序,而如果延迟时长小于零,那么会立刻投递到相应的队列。

第一种实现方式,不在本文的讨论范围,就不细说,下面将对第二种实现方式进行介绍。

ps:RabbitMQ 的版本最好是 3.6.x 及以上,Erlang/OTP 的版本要在 18.0 以上。

使用延迟消息交换机插件

下载插件

因为该插件默认是没有在 RabbitMQ 的软件包的 plugins 目录下,需要自己下载然后放到 plugins 目录下,下载地址如下:

下载下来后,解压,然后拷贝到 plugins 目录下,如果是通过 rpm 是方式安装,目录应该是:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.15/plugins;如果是 Mac 用户,且使用 brew 安装,目录则在:/usr/local/Cellar/rabbitmq/3.7.7/plugins

启用插件

# 启用插件 rabbitmq-delayed-message-exchange
rabbitmq-plugins enable rabbitmq-delayed-message-exchange

配合 Spring Cloud Stream 使用延迟交换机

首先来看一下延迟交换机如何配置:

spring:
  cloud:
    stream:
      bindings:
        delayedQueueOutput:
          destination: delayedQueueTopic
          content-type: application/json
          binder: rabbit

        delayedQueueInput:
          destination: delayedQueueTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit

      rabbit:
        bindings:
          delayedQueueOutput:
            producer:
              delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange

          delayedQueueInput:
            consumer:
              delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange

重点关注2个配置:spring.cloud.stream.rabbit.bindings.ChannelName.producer.delayedExchangespring.cloud.stream.rabbit.bindings.ChannelName.consumer.delayedExchange

这2个配置分别属于生产者和消费者的配置,但都是用于告诉 Spring Cloud Stream 是否将交换机声明为一个延迟消息交换机。这2个是成对出现,如果少配置了一个,服务启动时会报一个警告,下文会说明。

延迟消息交换机的相关配置就这么简单,接下来通过测试用例来看一下效果。

ScasDelayedTest

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayModel {

    /**
     * 延迟投递的时长. 单位: ms
     */
    private long delay;

}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("delayed")
@EnableBinding({ScasDelayedTest.MessageSink.class, ScasDelayedTest.MessageSource.class})
public class ScasDelayedTest {

    @Autowired
    private DelayedQueueProducer delayedQueueProducer;

    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            // 随机延迟 3-8 秒
            long delay = RandomUtil.randomLong(3, 8) * 1000;
            delayedQueueProducer.publish(new DelayModel(delay));
        }

        Thread.sleep(1000000);
    }

    @Component
    public static class DelayedQueueProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(DelayModel model) {
            long delay = model.getDelay();
            Message<DelayModel> message = MessageBuilder.withPayload(model).setHeader("x-delay", delay).build();
            messageSource.delayedQueueOutput().send(message);
            log.info("发布延迟队列消息: {}", model);
        }

    }

    @Component
    public static class DelayedQueueHandler {

        @StreamListener("delayedQueueInput")
        public void handle(DelayModel model) throws InterruptedException {
            log.info("消费延迟队列的消息. model: [{}].", model);
        }

    }

    public interface MessageSink {

        @Input("delayedQueueInput")
        SubscribableChannel delayedQueueInput();

    }

    public interface MessageSource {

        @Output("delayedQueueOutput")
        MessageChannel delayedQueueOutput();

    }

}

上面的代码很简单,重点是在构建消息时,比平常多了一个步骤,即 .setHeader("x-delay", delay),其中变量 delay 为该消息需要延迟多久才被消费。

很好理解,通过 setHeader 方法,对 Message 添加一个名为 x-delay 的头部,对应的值则为延迟时长,单位为 ms。当该消息被投递到延迟交换机后,获取头部 x-delay 的值,如果小于0,那么立即将消息路由到相应的队列被消费,如果大于0,则延迟对应时间。

启动 ScasDelayedTest

启动测试用例后,控制台会出现类似如下图的输出:


ScasDelayedTest

查看延迟投递的消息数量

怎么查看延迟投递的消息数量?可以在 RabbitMQ Management 的对应交换机页面查看,
http://localhost:15672/#/exchanges/%2F/delayedQueueTopic

延迟投递的消息数量

ps: 为达到查看效果,可以适当增加延迟时长。

使用延迟队列实现“定时关闭超时未支付订单”

上面简单介绍了延迟交换机的使用方法,现在回到正题,如何使用延迟队列来实现 “定时关闭超时未支付订单” 呢?

针对上面的场景,一般的思路是:定义一个定时任务,比如每分钟查询一下订单表,找出接下来1、2钟内需要关闭的订单,然后再对每一笔订单执行 关闭订单 操作,当然在关闭之前需要再次确认订单是否 “已支付”。

为了简单,再通过一个测试用例来模拟一下具体场景。

ScasCloseUnpaidOrderTest

@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderModel {

    /**
     * 订单id
     */
    private Long id;

    /**
     * 订单失效时间
     */
    private Long expireTime;

    @Override
    public String toString() {
        return "OrderModel{" +
                "id=" + id +
                ", expireTime=" + TimeUtil.format(TimeUtil.toLocalDateTime(expireTime)) +
                '}';
    }
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("delayed")
@EnableBinding({ScasCloseUnpaidOrderTest.MessageSink.class, ScasCloseUnpaidOrderTest.MessageSource.class})
public class ScasCloseUnpaidOrderTest {

    @Autowired
    private CloseUnpaidOrderProducer closeUnpaidOrderProducer;

    @Test
    public void test() throws InterruptedException {

        // 模拟每分钟的0秒执行定时任务
        long toSleep = 60000 - System.currentTimeMillis() % 60000;
        Thread.sleep(toSleep);

        List<OrderModel> models = buildUnpaidOrderModel();
        for (OrderModel model : models) {
            closeUnpaidOrderProducer.publish(model);
        }

        Thread.sleep(1000000);

    }

    private List<OrderModel> buildUnpaidOrderModel() {

        long now = System.currentTimeMillis();

        List<OrderModel> models = new ArrayList<>(5);
        for (int i = 0; i < 5; i++) {

            long id = RandomUtil.randomLong(10000, 100000);
            // 模拟 订单将在小于60s内过期
            long expireTime = now + RandomUtil.randomLong(0, 60) * 1000;

            OrderModel model = new OrderModel();
            model.setId(id);
            model.setExpireTime(expireTime);
            models.add(model);
        }
        return models;
    }

    @Component
    public static class CloseUnpaidOrderProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(OrderModel model) {
            long now = System.currentTimeMillis();
            long delay = model.getExpireTime() - now;
            Message<OrderModel> message = MessageBuilder.withPayload(model).setHeader("x-delay", delay).build();
            messageSource.closeUnpaidOrderOutput().send(message);
            log.info("发布 [关闭超时未支付订单] 消息. delay: {}, model: {}", delay, model);
        }

    }

    @Component
    public static class CloseUnpaidOrderHandler {

        private Random random = new Random();

        @StreamListener("closeUnpaidOrderInput")
        public void handle(OrderModel model) throws InterruptedException {

            log.info("检查订单状态, 关闭支付超时订单. model: {}", model);

            if (isPaySuccess()) {
                log.info("订单 [{}] 支付超时. 关闭订单.", model.getId());
            } else {
                log.info("订单 [{}] 支付完成.", model.getId());
            }
        }

        private boolean isPaySuccess() {
            // 模拟从支付系统查询支付状态.
            return random.nextInt(10) % 3 == 0;
        }

    }

    public interface MessageSource {

        @Output("closeUnpaidOrderOutput")
        MessageChannel closeUnpaidOrderOutput();

    }

    public interface MessageSink {

        @Input("closeUnpaidOrderInput")
        SubscribableChannel closeUnpaidOrderInput();

    }

}

配置文件跟上一个测试用例基本一样:

spring:
  cloud:
    stream:
      bindings:
        closeUnpaidOrderOutput:
          destination: closeUnpaidOrderTopic
          content-type: application/json
          binder: rabbit

        closeUnpaidOrderInput:
          destination: closeUnpaidOrderTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit

      rabbit:
        bindings:
          closeUnpaidOrderOutput:
            producer:
              delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange

          closeUnpaidOrderInput:
            consumer:
              delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange

启动 ScasCloseUnpaidOrderTest

启动后,可以看到控制台有类似输出:


ScasCloseUnpaidOrderTest

相信上面的代码对应各位看官来说,理解起来肯定是毫无压力的,这里就不在赘述。

相关链接

https://www.rabbitmq.com/community-plugins.html
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

推荐阅读

Spring Cloud Stream 进阶配置——高可用(二)——死信队列
Spring Cloud Stream 进阶配置——高可用(一)——失败重试
Spring Cloud Stream 进阶配置——高吞吐量(三)——批量预取消息(prefetch)
Spring Cloud Stream 进阶配置——高吞吐量(二)——弹性消费者数量
Spring Cloud Stream 进阶配置——高吞吐量(一)——多消费者

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 210,835评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,900评论 2 383
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,481评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,303评论 1 282
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,375评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,729评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,877评论 3 404
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,633评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,088评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,443评论 2 326
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,563评论 1 339
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,251评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,827评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,712评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,943评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,240评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,435评论 2 348

推荐阅读更多精彩内容