RocketMQ源码-ConsumeQueue的构建


1 概述
2 入口方法
3 ConsumeQueue索引结构
4 索引构建

1 概述

RocketMQ一个Broker中可以建立多个Topic,每个Topic又可以有多个queue,Broker在接收生产者发来的消息时,是按照消息到来的顺序追加到同一个文件中的,当然文件默认大小为1G,如果超过文件最大大小,则会接着前一个文件写入的数据继续写入。

所有Topic所有queue的数据放在一起就造成了查询数据或者消费数据时面临着大量的随机读,也造成查询数据需要从头到尾读取所有的数据。为了避免每次查询或者消费者拉去数据时从头到尾遍历,RocketMQ在消息数据上构建了两种索引,一个是笔者文章RocketMQ源码-Index索引介绍介绍的全局索引Index索引,另一个就是本文介绍的根据queue划分后的每个队列的ConsumeQueue索引。

Index索引和ConsumeQueue的区别主要有三个,第一是Index基于消息的MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX属性构建,而ConsumeQueue基于消息标签的hash码构建;第二是Index为全局索引,不区分主题队列,所有消息索引在一个文件中,而ConsumeQueue对应一个主题的一个队列,每个主题的每个队列都会有一个ConsumeQueue索引;第三是Index主要用于消息查询,而ConsumeQueue主要用于消息消费时,消费者拉取消息使用,这里也能说明为什么Index设计为全局索引而ConsumeQueue为单个队列的索引,因为消息查询时一般为查询所有消息中的满足指定条件的消息,而消息消费时,消费者一般只会拉取自己订阅(或者是订阅之后负载均衡的被分配)的某个主题下某个队列的消息。

2 入口方法

和笔者文章RocketMQ源码-Index索引介绍一样,ConsumeQueue构建的入口也是在ReputMessageService服务的run方法中进行reput操作触发的,用于构建ConsumeQueue的类为CommitLogDispatcherBuildConsumeQueue,其也是DefaultMessageStore的内部类,源码如下:

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            //只会为普通的非事务消息和已提交的事务消息
            //做索引
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            //为提交的事务消息或者已经回滚的事务消息
            //则不索引
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

在介绍具体如何构建ConsumeQueue之前,我们先介绍下ConsumeQueue索引的结构。

3 ConsumeQueue索引结构

ConsumeQueue的结构比较简单,如下:

ConsumeQueue索引结构.jpg

如上图所示,每个索引项在文件中占20个字节,各字段分别为:

  • CommitLog Offset:该消息在CommitLog的起始物理偏移,long类型,8字节;
  • Size:该消息的大小,int类型,4字节;
  • Message Tags HashCode:消息标签对应的hashCode,long类型,8字节。

这里要注意一下,每个ConsumeQueue还有一个用于记录扩展索引信息的ConsumeQueueExt类实例,如果配置启动了ConsumeQueue扩展类型,则ConsumeQueue中的Message Tags HashCode记录的并不是消息标签对应的hashCode,记录的是该消息索引在扩展信息ConsumeQueueExt文件中的物理偏移,真正的Message Tags HashCode则记录在ConsumeQueueExt文件中。那么在读取ConsumeQueue如何区分Message Tags HashCode记录的是消息标签的hashCode,还是扩展信息偏移呢?ConsumeQueue中有个方法isExtAddr(long tagsCode)则用于实现这个判断:

//ConsumeQueue
/**
* Check {@code tagsCode} is address of extend file or tags code.
*/
public boolean isExtAddr(long tagsCode) {
    return ConsumeQueueExt.isExtAddr(tagsCode);
}

//ConsumeQueueExt
/**
* Check whether {@code address} point to extend file.
* <p>
* Just test {@code address} is less than 0.
* </p>
*/
public static boolean isExtAddr(final long address) {
    //MAX_ADDR = Integer.MIN_VALUE - 1L;
    //也即如果消息tagsCode小于Integer.Min_VALUE-1,
    //则为偏移地址,而不是tags的hashCode
    return address <= MAX_ADDR;
}

扩展索引ConsumeQueueExt除了记录消息的标签code,还记录了消息bitMap信息和存储时间。消息bitMap主要用于消息过了,暂不介绍。ConsumeQueueExt的基本存储结构为ConsumeQueueExt.CqExtUnit

4 索引构建

我们现在接着第2节的入口方法介绍,入口方法是调用DefaultMessageStore.this.putMessagePositionInfo(request);进行索引构建的,DefaultMessageStore.this.putMessagePositionInfo(request);实现如下:

//DefaultMessageStore
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    //先根据
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

ConsumeQueue中具体实现如下:

//ConsumeQueue
public void putMessagePositionInfoWrapper(DispatchRequest request) {
    final int maxRetries = 30;
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    //写入失败则会连续尝试30次
    for (int i = 0; i < maxRetries && canWrite; i++) {
        long tagsCode = request.getTagsCode();
        //如果启用了扩展索引,则先构造扩展索引保存单元
        //CqExtUnit,写入bitMap、保存时间、实际的消息
        //标签hashCode
        if (isExtWriteEnable()) {
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            cqExtUnit.setFilterBitMap(request.getBitMap());
            cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
            cqExtUnit.setTagsCode(request.getTagsCode());
            //写入之后则返回扩展索引刚写入的偏移地址
            long extAddr = this.consumeQueueExt.put(cqExtUnit);
            if (isExtAddr(extAddr)) {
                //tagsCode重置为扩展索引偏移地址
                tagsCode = extAddr;
            } else {
                log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                    topic, queueId, request.getCommitLogOffset());
            }
        }
        //进行实际写入
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
            request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        if (result) {
            this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            return;
        } else {
            // XXX: warn and notify me
            log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                + " failed, retry " + i + " times");

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.warn("", e);
            }
        }
    }

    // XXX: warn and notify me
    log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
    this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}


private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }
    //写入消息物理偏移、消息大小和tagsCode
    //tagsCode可能为扩展索引偏移或者实际标签code
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);

    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {

        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            this.minLogicOffset = expectLogicOffset;
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }

            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        this.maxPhysicOffset = offset + size;
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容

  • RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式...
    AI乔治阅读 2,062评论 2 5
  • 架构图 基本概念 Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息 Consumer 消息消...
    知止9528阅读 1,124评论 0 1
  • 楊孜 我善於把戀愛談成懷念 也善於把欲望化為信仰 如果妳立於近前 我的目光會躲躲闪闪 如果妳身處异鄉 我的牵掛會跟...
    楊孜阅读 390评论 2 2
  • 似兰斯馨
    王中海阅读 131评论 0 1
  • 这个时候,是准备吃饭的,人不多,吃得香。 人走得远,远得像是久远的梦。我们看了彼此,她走远了。走...
    Lan_9e0f阅读 156评论 0 1