RocketMq 死信队列

系列


开篇

  • 这篇文章的主要目的是分析RocketMq死信队列,包含死信队列的Topic的生成,死信队列消息的写入,死信队列消息的导出。

  • 死信队列消息的前提是基于consumer消费消息异常后经过多次重试队列的投递依赖消费失败最终会进入死信队列。

  • 理解死信队列的核心在于理解死信队列对应Topic的配置,包括topicName、readQueueNums、writeQueueNums、perm。

  • 死信队列对应Topic的权限为2,只有写权限,所以死信队列没有办法读取。

    // 读权限为4
    public static final int PERM_READ = 0x1 << 2;
    // 写权限为2
    public static final int PERM_WRITE = 0x1 << 1;
    // 继承权限为1
    public static final int PERM_INHERIT = 0x1 << 0;


死信队列Topic配置

{
    "topicConfigTable":{
        "%DLQ%quickstart_consumer_dlq":{
            "order":false,
            "perm":2,
            "readQueueNums":1,
            "topicFilterType":"SINGLE_TAG",
            "topicName":"%DLQ%quickstart_consumer_dlq",
            "topicSysFlag":0,
            "writeQueueNums":1
        }
    }
}
  • 死信队列 %DLQ%quickstart_consumer_dlq 的读写队列的个数为1。
  • 死信队列 %DLQ%quickstart_consumer_dlq 的读写队列的权限为2。
  • permission(2|4|6), intro[2:W; 4:R; 6:RW]


命令格式
usage: mqadmin updateTopicPerm [-b <arg>] [-c <arg>] [-h] [-n <arg>] -p <arg> -t <arg>
 -b,--brokerAddr <arg>    create topic to which broker
 -c,--clusterName <arg>   create topic to which cluster
 -h,--help                Print help
 -n,--namesrvAddr <arg>   Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
 -p,--perm <arg>          set topic's permission(2|4|6), intro[2:W; 4:R; 6:RW]
 -t,--topic <arg>         topic name

命令执行
./mqadmin updateTopicPerm -c DefaultCluster  -n localhost:9876 -p 6 -t %DLQ%quickstart_consumer_dlq

命令执行过程
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
update topic perm from 2 to 6 in 192.168.0.10:10911 success.
  • 通过updateTopicPerm的命令将死信队列的读写权限改为6,保证读写权限。
  • permission(2|4|6), intro[2:W; 4:R; 6:RW]

{
    "topicConfigTable":{
        "%RETRY%quickstart_consumer_dlq":{
            "order":false,
            "perm":6,
            "readQueueNums":1,
            "topicFilterType":"SINGLE_TAG",
            "topicName":"%RETRY%quickstart_consumer_dlq",
            "topicSysFlag":0,
            "writeQueueNums":1
        }
    }
}
  • 死信队列 %DLQ%quickstart_consumer_dlq 更改权限后的读写队列的权限为6。
  • permission(2|4|6), intro[2:W; 4:R; 6:RW]


死信队列消息投递过程

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
        throws RemotingCommandException {

        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final ConsumerSendMsgBackRequestHeader requestHeader =
            (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

        String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
        
        // 重试队列的Topic,RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
        }

        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
            newTopic,
            subscriptionGroupConfig.getRetryQueueNums(),
            PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);

        // 省略相关代码

        // 首先判断最大的重新消费次数
        int delayLevel = requestHeader.getDelayLevel();
        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            // 当前版本由consumer端的MaxReconsumeTimes指定
            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
        }
        // 当超过最大的重新消费次数后选择死信队列DLQTopic
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
            || delayLevel < 0) {

            // DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
            // 创建消费分组对应的死信队列的Topic
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE,// 死信队列只有写权限
                0
            );

        } else {
            if (0 == delayLevel) {
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }

            msgExt.setDelayTimeLevel(delayLevel);
        }

        // 重新生成消息体写入到新的topic当中,如果死信队列就重新写入commitLog和consumeQueue
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        if (putMessageResult != null) {
            switch (putMessageResult.getPutMessageStatus()) {
                case PUT_OK:
                    String backTopic = msgExt.getTopic();
                    String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                    if (correctTopic != null) {
                        backTopic = correctTopic;
                    }

                    this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);

                    response.setCode(ResponseCode.SUCCESS);
                    response.setRemark(null);

                    return response;
                default:
                    break;
            }

            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(putMessageResult.getPutMessageStatus().name());
            return response;
        }

        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("putMessageResult is null");
        return response;
    }
}
  • consumerSendMsgBack用于处理consumer在消费异常后重新投递到broker的消息。
  • 重投处理逻辑先将消息投递到重试Topic的%RETRY%consumerGroup队列,超过最大重试次数后将消息投递到死信Topic的%DLQ%consumerGroup队列。


查询死信队列状态

./mqadmin topicStatus -n localhost:9876 -t %DLQ%quickstart_consumer_dlq
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
broker-a                          0     0                     1999                    2020-06-25 08:23:28,277


根据offset查询消息内容

./mqadmin queryMsgByOffset -b broker-a -n localhost:9876 -i 0 -o 1 -t %DLQ%quickstart_consumer_dlq
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
OffsetID:            C0A8000A00002A9F00000000007E6157
Topic:               %DLQ%quickstart_consumer_dlq
Tags:                [TagA]
Keys:                [null]
Queue ID:            0
Queue Offset:        1
CommitLog Offset:    8282455
Reconsume Times:     2
Born Timestamp:      2020-05-16 22:28:54,301
Store Timestamp:     2020-06-25 08:23:26,974
Born Host:           192.168.0.8:57604
Store Host:          192.168.0.10:10911
System Flag:         0
Properties:          {MIN_OFFSET=0, REAL_TOPIC=%DLQ%quickstart_consumer_dlq, ORIGIN_MESSAGE_ID=C0A8000800002A9F0000000000000000, RETRY_TOPIC=TopicTest, MAX_OFFSET=1999, UNIQ_KEY=C0A80008723518B4AAC25212599B0000, CLUSTER=DefaultCluster, WAIT=false, DELAY=3, TAGS=TagA, REAL_QID=0}
Message Body Path:   /tmp/rocketmq/msgbodys/C0A80008723518B4AAC25212599B0000

实际查看内容
cat /tmp/rocketmq/msgbodys/C0A80008723518B4AAC25212599B0000
Hello RocketMQ 1


consumer消费死信队列

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer_dlq");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅死信队列
        consumer.subscribe("%DLQ%quickstart_consumer_dlq", "*");
        consumer.setMaxReconsumeTimes(1);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
  • 通过将死信队列的权限设置为读写权限,然后直接通过订阅对应的死信队列即可。
  • consumer.subscribe("%DLQ%quickstart_consumer_dlq", "*");
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335