前文我们梳理了消息在Commit Log文件的存储过程,讨论了消息的落盘策略,然而仅仅通过Commit Log存储消息是远远不够的,例如当我们需要消费某个topic的消息时,通过对Commit Log整体遍历寻找消息的方式无疑非常的低效。所以本文将引出2个很重要的概念:消费队列、IndexFile
消费队列
什么是消费队列呢?其实在上一章的消息协议格式中,就有消息队列的体现,简单回顾一下协议的前6个字段:
msg total len:消息总长度
msg magic :魔法值,标记当前数据是一条消息
msg CRC :消息内容的CRC值
queue id :队列id
msg flag :消息标记位
queue offset :队列的偏移量,从0开始累加
其中第4个字段为消费队列的id,第6个字段为当前队列的偏移量;所以在消息产生的时候,消息所属的队列就已经确定
那么究竟该如何理解“消费队列”的概念呢?我们举例来说:假定某个RocketMQ集群部署了3个broker(brokerA、brokerB、brokerC),主题topicTest的消息分别存储在这3个broker中,而每个broker又对应一个commit log文件。我们把broker中的主题topicTest中的消息划分为多个队列,每个队列便是这个topic在当前broker的消费队列
数据结构
当然消费队列不会将消息体进行冗余存储,数据结构如下:
即在消费队列的文件中,需要存储20个字节的索引内容。RocketMQ中默认指定每个消费队列的文件存储30万条消息的索引,而一个索引占用20个字节,这样每个文件的大小便是固定值300000*20/1024/1024≈5.72M,而文件命名采用与commit log相似的方式,即总长度20位,高位补0
store/consumequeue/topicXX/0/00000000000000000000 第一个文件
store/consumequeue/topicXX/1/00000000000006000000 第二个文件
store/consumequeue/topicXX/2/00000000000012000000 第三个文件
store/consumequeue/topicXX/3/00000000000018000000 第四个文件
与commitLog只有一个文件不同,consumeQueue是每个topic的每个消费队列都会生产多个文件
为什么消费队列文件存储消息的个数要设置成30万呢?一个文件还不到6M,为何不能像commit log那样设置为1G呢?鄙人没有在源码及网上找到相关资料,猜测可能是个经验值。首先该值不宜设置的过大,因为消息总是有失效期的,例如3天失效,如果消费队列的文件设置过大的话,有可能一个文件中包含了过去一个月的消息,时间跨度过大,这样不利于及时删除已经过期的消息;其次该值也不宜过小,太小的话会产生大量的小文件,在管理维护上制造负担。最理想情况是一个消费队列文件对应一个commit log,这样commit log过期时,消费队列文件也跟着及时失效
消费队列之commit log视角
某个commit log会存储多个topic消息,而每个topic有可能会将消息划分至多个队列中;如上图所示,commit log按顺序依次存储消息,而某个topic的消息在commit log中大概率也是不连续的,而consume queue的作用便是将某个topic下同一个队列的消息依次标识,便于消费时顺序消费
消费队列之topic视角
上图描述了Topic A的消费队列分配情况,所有的消息相对均匀的分散在3个broker中,每个broker的消息又分为队列0及队列1,所以一共有6个消费队列,所以Topic A的consumer端也是建议开辟6个进程去消费数据
这里简单提一下消费端,我们知道一个消费队列同时只能被一个消费实例消费,所以消费实例的数量建议值为 <= consumeQueue 数量,理想情况是消费实例的个数完全等于consumeQueue个数,这样吞吐能达到最佳
consumerNum < consumeQueue 消费实例小于消费队列个数。例如某个topic的消费队列一共有6个,但是只有3个消费实例,RocketMQ会尽量均衡每个消费实例分配到的消费队列,所以每个消费实例实际会消费2个队列的内容。这种情况可能增加消费实例可以提高整体吞吐
consumerNum > consumeQueue 消费实例大于消费队列个数。比如某个topic的消费队列有6个,但是有8个消费实例注册,因为一个消费实例只能对应一个消费队列,所以势必导致有2个消费实例处于空闲状态,不会拿到任何数据
consumerNum == consumeQueue 消费实例等于消费队列个数。这比较理想的状态,不会有过载或饥饿产生
基于同一个消费队列只能被一个消费实例消费的特性,我们可以将某类消息均发送给一个队列,这样消费的时候能够严格保序。例如我们希望订单的流程是保序的,可以通过orderId % consumeQueue来决定当前订单的消费发送给哪个队列,从而达到保序的目的
写入流程
消费队列的写入跟commit log的写入是同步进行的吗?答案是否定的,RocketMQ会启动一个独立的线程来异步构建消费队列(构建索引文件也是这个线程)
简单描述下流程:构建索引的线程为ReputMessageService,跟写入commitLog的线程是异步关系,该线程会不断地将没有构建索引的消息从commit log中取出,将物理偏移量、消息长度、tag写入文件。值得一提的是,消息队列文件的写入跟commit log不同,commit log的写入有很多刷盘策略,而consumeQueue每条消息解析完毕都会刷盘,而且采用的是FileChannel。
借此,我们抛出几个问题
问题1:为什么消费队列写入文件要用FileChannel?批次多,数据量小的场景用Mapp不香吗?
关于这个问题,我咨询了RocketMQ开源社区比较有影响力的大佬,给出的答复是:的确是这样,RocketMQ这样设计考虑欠佳,写文件这块应该向kafka学习,即消息写入用FileChannel,索引写入用Mapp
问题2:为什么RocketMQ中很少有用到堆外内存?文件写入的话,使用堆外内存少一次内存拷贝,不是可以提高性能吗?
是这样的,类似这样的场景首选还是堆外内存;RocketMQ的确还有很多可优化的空间,在将来的某个版本,我们一定可以看到针对此处的优化
问题3:如果消息已经写入commit log,但还未写入消费队列,consumer端能正常消费到这条消息吗?
抛出这个问题,大家可以思考一下,在消息产生、消费的章节再回答
IndexFile
ReputMessageService线程除了构建消费队列的索引外,还同时根据消息key构建了索引
除了正常的生产、消费消息外,RocketMQ还提供了根据msg key进行查询的功能,将消息key相同的消息一并查出;我们当然可以通过扫描全量的commit log将相同msg key类型的消息过滤出来,但性能堪忧,而且涉及大量的IO运算;IndexFile便是为了实现快速查找目标消息而衍生的索引文件
IndexFile的命名规范也有别于消费队列,IndexFile是按照创建时间来命名的,因为根据消息key进行匹配查询的时候,都要带上时间参数,文件名起到了快速定位索引数据位置的作用,下面列举一组IndexFile的文件名
rocketMQ/store/20211204094647480
rocketMQ/store/20211205094647480
rocketMQ/store/20211206094647480
rocketMQ/store/20211207094647480
IndexFile结构
我们具体看一下此文件的结构,与消费队列文件相同,IndexFile是定长的
由三部分组成:
文件头,占用 40 byte
slot,hash槽儿,占用500w4= 20000000 byte
索引内容 占用2000w20= 400000000 byte
所以文件总大小为: 40+50000004+2000000020=420000040byte ≈ 400M
存储原理
简单剖析一下各部分的字段
文件头 共 20 byte
开始时间(8 byte)存储前索引文件内,所有消息的最小时间
结束时间(8 byte)存储前索引文件内,所有消息的最大时间,因为根据key查询的时候 ,时间是必填选项 ,开始与结束时间用来快速定位消息是否命中
最小物理偏移量(8 byte)存储前索引文件内,所有消息的最小物理偏移量
最大物理偏移量(8 byte)存储前索引文件内,所有消息的最大物理偏移量;框定最小、最大物理偏移量,是为了给通过物理地址查询时快速索引
有效hash slot数量(4 byte)因为存储hash冲突的情况,所以最坏情况是,hash slot只有1个,最理想情况是有500万个
index索引数量(4 byte)如果当前索引文件已经构建完毕,那么该值是固定值2000万
slot 4 byte
当前槽儿内的最近一次index的位置(4 byte)
索引内容 20 byte
hash值(4 byte)
消息的物理地址(8 byte)
时间差(4 byte)当前消息与最早消息的时间差
索引(4 byte)当前槽儿内,上一条索引的位置
存储方式如下
当一条新的消息索引进来时,首先定位当前消息命中的slot,该slot存储着最近一条消息的存储位置。将消息的索引数据append至文件尾部的同时,将最新索引数据的next指向上一条索引位置,这样便形成了一条当前slot按照时间存入的倒序的链表
消息查询
根据前文的铺垫,同一个槽儿内的数据,已经被一个隐式的链儿串连在了一起,当我们根据topic+key进行数据查询时,直接通过topic + # + key的hash值,定位到某个槽儿,进而依次寻找消息即可;当然同一个槽儿内的数据可能出现hash冲突,我们需要将不符合条件的数据过滤掉
当我阅读这部分源码的时候,发现了其内部存在的一个bug,其做消息过滤时,仅仅判断消息的hash字段是否相等,如果相等的话,继而认定为要寻找的数据从而返回
class : org.apache.rocketmq.store.index.IndexFile
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
进而带来的一些问题,例如:
新建topic AaTopic,并向topic中发送一条消息,message key为Aa
新建topic BBTopic,并向topic中发送一条消息,message key为BB
当我们通过 topic=AaTopic && key=BB查询时,预期应该返回空数据,但实际却返回了一条数据
其主要是因为Aa与BB拥有相同的HashCode2080
message id
消息id,在RocketMQ中又定义为msg unique id,组成形式是“ip+物理偏移量”(ip非定长字段,会因ipv4与ipv6的不同而有所区别),其中ip及物理偏移量在消息的协议格式中均有体现;当我们拿到消息所属的broker地址,以及该消息的物理存储偏移量时,也就唯一定位了该条消息,所以使用“ip+物理偏移量”的方式作为消息id
在某些场景下,msg unique id也会存储在indexFile中,
索引查询
查询这块,我们将放在消息发送、消费的章节来阐述,此处仅仅讨论索引结构设计中page cache所承担的角色
其实在整个流程中,RocketMQ是极度依赖page cache的,尤其是消费队列,数据查询是通过如下流程来查询消息的:
1、broker接受请求 -> 2、查询ConsumeQueue文件(20byte) -> 3、拿到消息的physicOffset -> 4、查询commitLog文件(msg size)
我们发现第二步及第四步都只是查询很小的数据量,如果没有page cache挡在磁盘前,整体的性能必将是断崖式下降。我有朋友做过禁掉page cache后,RocketMQ前后的性能的对比相差好几个量级