消息索引

前文我们梳理了消息在Commit Log文件的存储过程,讨论了消息的落盘策略,然而仅仅通过Commit Log存储消息是远远不够的,例如当我们需要消费某个topic的消息时,通过对Commit Log整体遍历寻找消息的方式无疑非常的低效。所以本文将引出2个很重要的概念:消费队列、IndexFile

消费队列

什么是消费队列呢?其实在上一章的消息协议格式中,就有消息队列的体现,简单回顾一下协议的前6个字段:

msg total len:消息总长度
msg magic :魔法值,标记当前数据是一条消息
msg CRC :消息内容的CRC值
queue id :队列id
msg flag :消息标记位
queue offset :队列的偏移量,从0开始累加

其中第4个字段为消费队列的id,第6个字段为当前队列的偏移量;所以在消息产生的时候,消息所属的队列就已经确定

那么究竟该如何理解“消费队列”的概念呢?我们举例来说:假定某个RocketMQ集群部署了3个broker(brokerA、brokerB、brokerC),主题topicTest的消息分别存储在这3个broker中,而每个broker又对应一个commit log文件。我们把broker中的主题topicTest中的消息划分为多个队列,每个队列便是这个topic在当前broker的消费队列

数据结构

当然消费队列不会将消息体进行冗余存储,数据结构如下:

image.png

即在消费队列的文件中,需要存储20个字节的索引内容。RocketMQ中默认指定每个消费队列的文件存储30万条消息的索引,而一个索引占用20个字节,这样每个文件的大小便是固定值300000*20/1024/1024≈5.72M,而文件命名采用与commit log相似的方式,即总长度20位,高位补0

store/consumequeue/topicXX/0/00000000000000000000 第一个文件
store/consumequeue/topicXX/1/00000000000006000000 第二个文件
store/consumequeue/topicXX/2/00000000000012000000 第三个文件
store/consumequeue/topicXX/3/00000000000018000000 第四个文件

与commitLog只有一个文件不同,consumeQueue是每个topic的每个消费队列都会生产多个文件

为什么消费队列文件存储消息的个数要设置成30万呢?一个文件还不到6M,为何不能像commit log那样设置为1G呢?鄙人没有在源码及网上找到相关资料,猜测可能是个经验值。首先该值不宜设置的过大,因为消息总是有失效期的,例如3天失效,如果消费队列的文件设置过大的话,有可能一个文件中包含了过去一个月的消息,时间跨度过大,这样不利于及时删除已经过期的消息;其次该值也不宜过小,太小的话会产生大量的小文件,在管理维护上制造负担。最理想情况是一个消费队列文件对应一个commit log,这样commit log过期时,消费队列文件也跟着及时失效

消费队列之commit log视角

image.png

某个commit log会存储多个topic消息,而每个topic有可能会将消息划分至多个队列中;如上图所示,commit log按顺序依次存储消息,而某个topic的消息在commit log中大概率也是不连续的,而consume queue的作用便是将某个topic下同一个队列的消息依次标识,便于消费时顺序消费

消费队列之topic视角

image.png

上图描述了Topic A的消费队列分配情况,所有的消息相对均匀的分散在3个broker中,每个broker的消息又分为队列0及队列1,所以一共有6个消费队列,所以Topic A的consumer端也是建议开辟6个进程去消费数据

这里简单提一下消费端,我们知道一个消费队列同时只能被一个消费实例消费,所以消费实例的数量建议值为 <= consumeQueue 数量,理想情况是消费实例的个数完全等于consumeQueue个数,这样吞吐能达到最佳

consumerNum < consumeQueue 消费实例小于消费队列个数。例如某个topic的消费队列一共有6个,但是只有3个消费实例,RocketMQ会尽量均衡每个消费实例分配到的消费队列,所以每个消费实例实际会消费2个队列的内容。这种情况可能增加消费实例可以提高整体吞吐
consumerNum > consumeQueue 消费实例大于消费队列个数。比如某个topic的消费队列有6个,但是有8个消费实例注册,因为一个消费实例只能对应一个消费队列,所以势必导致有2个消费实例处于空闲状态,不会拿到任何数据
consumerNum == consumeQueue 消费实例等于消费队列个数。这比较理想的状态,不会有过载或饥饿产生
基于同一个消费队列只能被一个消费实例消费的特性,我们可以将某类消息均发送给一个队列,这样消费的时候能够严格保序。例如我们希望订单的流程是保序的,可以通过orderId % consumeQueue来决定当前订单的消费发送给哪个队列,从而达到保序的目的

写入流程

消费队列的写入跟commit log的写入是同步进行的吗?答案是否定的,RocketMQ会启动一个独立的线程来异步构建消费队列(构建索引文件也是这个线程)

image.png

简单描述下流程:构建索引的线程为ReputMessageService,跟写入commitLog的线程是异步关系,该线程会不断地将没有构建索引的消息从commit log中取出,将物理偏移量、消息长度、tag写入文件。值得一提的是,消息队列文件的写入跟commit log不同,commit log的写入有很多刷盘策略,而consumeQueue每条消息解析完毕都会刷盘,而且采用的是FileChannel。

借此,我们抛出几个问题

问题1:为什么消费队列写入文件要用FileChannel?批次多,数据量小的场景用Mapp不香吗?

关于这个问题,我咨询了RocketMQ开源社区比较有影响力的大佬,给出的答复是:的确是这样,RocketMQ这样设计考虑欠佳,写文件这块应该向kafka学习,即消息写入用FileChannel,索引写入用Mapp

问题2:为什么RocketMQ中很少有用到堆外内存?文件写入的话,使用堆外内存少一次内存拷贝,不是可以提高性能吗?

是这样的,类似这样的场景首选还是堆外内存;RocketMQ的确还有很多可优化的空间,在将来的某个版本,我们一定可以看到针对此处的优化

问题3:如果消息已经写入commit log,但还未写入消费队列,consumer端能正常消费到这条消息吗?

抛出这个问题,大家可以思考一下,在消息产生、消费的章节再回答

IndexFile

ReputMessageService线程除了构建消费队列的索引外,还同时根据消息key构建了索引

image.png

除了正常的生产、消费消息外,RocketMQ还提供了根据msg key进行查询的功能,将消息key相同的消息一并查出;我们当然可以通过扫描全量的commit log将相同msg key类型的消息过滤出来,但性能堪忧,而且涉及大量的IO运算;IndexFile便是为了实现快速查找目标消息而衍生的索引文件

IndexFile的命名规范也有别于消费队列,IndexFile是按照创建时间来命名的,因为根据消息key进行匹配查询的时候,都要带上时间参数,文件名起到了快速定位索引数据位置的作用,下面列举一组IndexFile的文件名

rocketMQ/store/20211204094647480
rocketMQ/store/20211205094647480
rocketMQ/store/20211206094647480
rocketMQ/store/20211207094647480

IndexFile结构

我们具体看一下此文件的结构,与消费队列文件相同,IndexFile是定长的

image.png

由三部分组成:

文件头,占用 40 byte
slot,hash槽儿,占用500w4= 20000000 byte
索引内容 占用2000w
20= 400000000 byte
所以文件总大小为: 40+50000004+2000000020=420000040byte ≈ 400M

存储原理

简单剖析一下各部分的字段

文件头 共 20 byte

开始时间(8 byte)存储前索引文件内,所有消息的最小时间
结束时间(8 byte)存储前索引文件内,所有消息的最大时间,因为根据key查询的时候 ,时间是必填选项 ,开始与结束时间用来快速定位消息是否命中
最小物理偏移量(8 byte)存储前索引文件内,所有消息的最小物理偏移量
最大物理偏移量(8 byte)存储前索引文件内,所有消息的最大物理偏移量;框定最小、最大物理偏移量,是为了给通过物理地址查询时快速索引
有效hash slot数量(4 byte)因为存储hash冲突的情况,所以最坏情况是,hash slot只有1个,最理想情况是有500万个
index索引数量(4 byte)如果当前索引文件已经构建完毕,那么该值是固定值2000万

slot 4 byte

当前槽儿内的最近一次index的位置(4 byte)
索引内容 20 byte

hash值(4 byte)
消息的物理地址(8 byte)
时间差(4 byte)当前消息与最早消息的时间差
索引(4 byte)当前槽儿内,上一条索引的位置

存储方式如下

image.png

当一条新的消息索引进来时,首先定位当前消息命中的slot,该slot存储着最近一条消息的存储位置。将消息的索引数据append至文件尾部的同时,将最新索引数据的next指向上一条索引位置,这样便形成了一条当前slot按照时间存入的倒序的链表

消息查询

根据前文的铺垫,同一个槽儿内的数据,已经被一个隐式的链儿串连在了一起,当我们根据topic+key进行数据查询时,直接通过topic + # + key的hash值,定位到某个槽儿,进而依次寻找消息即可;当然同一个槽儿内的数据可能出现hash冲突,我们需要将不符合条件的数据过滤掉

当我阅读这部分源码的时候,发现了其内部存在的一个bug,其做消息过滤时,仅仅判断消息的hash字段是否相等,如果相等的话,继而认定为要寻找的数据从而返回

class : org.apache.rocketmq.store.index.IndexFile

if (keyHash == keyHashRead && timeMatched) {
    phyOffsets.add(phyOffsetRead);
}

进而带来的一些问题,例如:

新建topic AaTopic,并向topic中发送一条消息,message key为Aa
新建topic BBTopic,并向topic中发送一条消息,message key为BB

当我们通过 topic=AaTopic && key=BB查询时,预期应该返回空数据,但实际却返回了一条数据

image.png

其主要是因为Aa与BB拥有相同的HashCode2080

message id
消息id,在RocketMQ中又定义为msg unique id,组成形式是“ip+物理偏移量”(ip非定长字段,会因ipv4与ipv6的不同而有所区别),其中ip及物理偏移量在消息的协议格式中均有体现;当我们拿到消息所属的broker地址,以及该消息的物理存储偏移量时,也就唯一定位了该条消息,所以使用“ip+物理偏移量”的方式作为消息id

在某些场景下,msg unique id也会存储在indexFile中,

索引查询

查询这块,我们将放在消息发送、消费的章节来阐述,此处仅仅讨论索引结构设计中page cache所承担的角色

其实在整个流程中,RocketMQ是极度依赖page cache的,尤其是消费队列,数据查询是通过如下流程来查询消息的:

1、broker接受请求 -> 2、查询ConsumeQueue文件(20byte) -> 3、拿到消息的physicOffset -> 4、查询commitLog文件(msg size)

我们发现第二步及第四步都只是查询很小的数据量,如果没有page cache挡在磁盘前,整体的性能必将是断崖式下降。我有朋友做过禁掉page cache后,RocketMQ前后的性能的对比相差好几个量级

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容