rocket源码 顺序消息和事务消息

顺序消息的实现

顺序消息进行消费时,若是第一次消费失败,可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT,下一次会继续消费此消息。

顺序消息的消费失败时的重试逻辑,具体代码在ProccessQueue中,顺序消费时手动从processQueue中取消息,内部是从msgTreeMap中取出消息后,将消息添加到consumingMsgOrderlyTreeMap中,若是消费成功,将该消息从consumingMsgOrderlyTreeMap中删除即可。若是消费失败,执行makeMessageToConsumeAgain方法,将这些消息再放回msgTreeMap。

顺序消费时有回滚和重试的逻辑,但是新版本不建议使用。回滚和重试的逻辑和上面相同,回滚时将消息重新放回treeMap,提交时不用操作treeMap,但是需要根据consumingMsgOrderlyTreeMap找到当前消费的offset,从下一个继续消费。

顺序消息消费时使用同一个线程,可以看一下ConsumeMessageOrderlyService

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(), // 迷惑性代码...
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

因为queue的长度是Integer.MAX_VALUE,因此在进行消费时使用的是一个线程,并且有序执行。

顺序消息的消费使用同一个线程是在ConsumeMessageOrderlyService.ConsumeRequest和ProcessQueue中实现的。

// ProcessQueue

private volatile boolean consuming = false;


    public boolean putMessage(final List<MessageExt> msgs) {
        boolean dispatchToConsume = false;
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                int validMsgCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                    if (null == old) {
                        validMsgCnt++;
                        this.queueOffsetMax = msg.getQueueOffset();
                        msgSize.addAndGet(msg.getBody().length);
                    }
                }
                msgCount.addAndGet(validMsgCnt);
                // 如果有消息可以进行消费,并且当前queue没有消费,则将dispatchToConsume和consuming置为true
                if (!msgTreeMap.isEmpty() && !this.consuming) {
                    dispatchToConsume = true;
                    this.consuming = true;
                }

                if (!msgs.isEmpty()) {
                    MessageExt messageExt = msgs.get(msgs.size() - 1);
                    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                    if (property != null) {
                        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                        if (accTotal > 0) {
                            this.msgAccCnt = accTotal;
                        }
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }

        return dispatchToConsume;
    }
// ConsumeMessageOrderlyService

    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) { // putMessage返回true时,才将request提交到线程池
        // 如果已经开始对该queue进行消费了,就不会再次提交任务
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }
// 提交给线程池的任务
// 主要代码
    class ConsumeRequest implements Runnable {

        @Override
        public void run() {
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    // 如果可以继续消费,直接在当前线程中轮询消费该ProcessQueue即可
                    for (boolean continueConsume = true; continueConsume; ) {
                        // 在consumerImpl中的pullMessage方法中持续给ProcessQueue添加消息
                        // 手动从ProcessQueue中取消息
                        List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                        if (!msgs.isEmpty()) {
                            try {
                                this.processQueue.getLockConsume().lock();
                                //消费消息
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                               
                            } finally {
                                this.processQueue.getLockConsume().unlock();
                            }
                            // 处理消费结果,若是成功继续消费
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } 
        }

看代码可以发现,如果顺序消息消费失败的话,即消费返回SUSPEND_CURRENT_QUEUE_A_MONENT时,当前线程会停止消费,在processConsumeResult时,会提交新的任务到线程池,在新的线程中继续消费该消息。

核心逻辑是保证一个ProcessQueue只在一个线程中轮询消费消息。

发送顺序消息时会添加一个队列选择器,将需要有序的消息发送到同一个队列。消费端拉取特定queue的数据时天生有序,在消费时使用同一个线程进行消费,因此就实现了顺序消息。

事务消息

二阶段提交加补偿机制

第一阶段提交消息到broker,broker将topic修改为RMQ_SYS_TRANS_HALF_TOPIC,存入对consumer不可见的topic/queue。如果此阶段写入成功,执行transactionListener.executeLocalTransaction()

第二阶段,根据本地事务的执行结果提交或者回滚第一阶段提交至broker的消息,这里使用的是OneWay方法,可靠性低,可能出现失败或者超时的情况。

broker端处理RequestCode.END_TRANSACTION的请求,如果是commit,则将原来的消息取出,更改为正确的topic/queue,并进行落盘,然后添加Op状态。如果是rollback,则直接添加Op状态即可。

添加Op状态是将消息添加到Op队列中,Op队列是为了补偿逻辑时减少判断。

补偿逻辑:

BrokerController启动时会启动TransactionMessageCheckService,默认每隔60s检查一次HALF_TOPIC下所有的queue中的消息,检查步骤如下

  • 先判断当前queue和对应的opQueue是否添加过消息,如果没有,遍历下一个queue,若有,进行下一步判断
  • 获取对应的opQueue中的消息,若是没有消息,遍历下一个queue,若有,进行下一步判断
  • 遍历当前queue
  • 如果当前偏移量已经添加了oP状态,直接遍历至下一个偏移量,否则进行下一步判断
  • 获取当前消息,若为null,遍历下一个偏移量,若不为null,进行下一步判断
  • 若当前消息需要舍弃或者跳过,遍历下一个偏移量,否则进行下一步判断
  • 判断当前消息是否需要check,若暂时不需要,重新走判断流程
  • 若是需要check,broker端给producer发送CHECK_TRANSACTION_STATE消息,producer端接收到消息后,执行TransactionListener.checkLocalTransaction,将check结果回发给broker。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354