最近 ONS 消息堆积的很严重,并且经常发现部分几乎没有消息消费的消费者也提示堆积,所以有必要深入了解一下
RocketMQ 的设计思路,来看看堆积量如何计算,以及如何正确的使用 Topic 以及 Consumer 等组件。
产生的问题背景在于,由于一开始对于RocketMQ
不够了解,同时足够懒得原因,导致我们所有业务都仅适用了一个topic
,所有业务线通过订阅不同的tag
来进行消费,本次深入了解后将进行业务重构,以正确的姿势使用RocketMQ
。
本次要排查的问题包括:
1、消息拉取时模型,是否会将非该消费者消息的消息也拉取到客户端?
2、如何计算堆积?
问题1的本质问题是消息拉取的过滤模型在于客户端,还是在服务端?问题2的本质问题是消息如何存储计算?欲探究该问题则需要明确RocketMQ
的底层存储模型设计,从顶层设计俯瞰消息队列整个框架。
底层存储模型
commitlog
是整个消息队列存储的核心文件,而consumerquque
是逻辑消息队列,主要存储commitlog offset
,消息长度
,tag的hashcode
,用于在消息消费时快速定位消息在commit log
文件位置,便于读取消息。IndexFile
俗称索引文件,主要存储消息key的hashcode
以及commitlog offset
,用于通过key快速定位到消息在commit log
文件位置,便于读取消息。
消息拉取模型分析
找到问题1的答案之前,先思考消息队列投递时做了什么?
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
以上是代码是从官网的地址copy而来,虽简单但是从其中足以找到消息投递时所需要的基本条件包括namesrvAddr
、topic
、tag
。
消息投递
// DefaultProducerImpl#sendDefaultImpl()
// 省略大部分代码,关键看备注部分
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// 从本地缓存或namesrv远程读取topic信息
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();‘
// 根据某种策略选择一个逻辑消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
从文中可以看到,在消息投递的过程中,已经在客户端通过某种策略找到指定的topic
下的逻辑队列,逻辑队列具体指的是consumerqueue
文件,服务端对应的处理主要是写入,具体有兴趣可以了解SendMessageProcessor
类,最终通过DefaultMessageStore
实现了数据的写入,但是并未看到写入consumerqueue
,因为实现consumerqueue
文件写入是通过另外的线程实现的,具体实现请参考ReputMessageService
,本文不再深入。
我们主要知道,在客户端除了上传基本属性数据之外,同时还在客户端选择好了将要写入的逻辑消息队列。
消息拉取
消息的拉取在客户端就不进行赘述了,主要看服务端的实现。有兴趣可以了解PullMessageService#run()
。服务端则重点查阅PullMessageProcessor#processRequest()
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
// 构建消息过滤
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
// 消息过滤的核心源码在ExpressionMessageFilter#isMatchedByConsumeQueue方法
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
// by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
if (tagsCode == null) {
return true;
}
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
// tagecode其实就是tag的hashcode
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
/// ....
}
// 接着PullMessageProcessor#processRequest()往下看
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
// 注意该消息读取的参数,包括topic, queueid, queueoffset, 已经消息最大条数
// 通过DefaultMessageStore#getMessage()继续查看
// 注意,这里的offset是queueoffset,而不是commitlog offset
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
// ...
// 查找consumerqueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
//
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
/// .....
// 消息匹配,这个对象由前文的MessageFilter定义
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue; //不匹配的消息则继续往下来读取
}
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);// offsetPy与sizePy查找commitlog上存储的消息内容
///....
}
以上源码阅读完后,问题1 不攻自破,在服务端上过滤好消息,但是很明显,查阅完整地源码可以清晰地确定,并非是每一次拉取消息都可以过滤到自己想要的消息,即该消费者拉取消息时可能在某一个comsumerqueue
上拉取不到消息,因为充斥着同一个topic
下的其他tag
的消息,也就意味着不是每次拉取都有意义,而阿里云ONS
的计费上明显提示拉取消息是要计算费用的。
消息堆积
消息堆积意为着服务端要维护消息的消费进度。
先来看一张图,图中的brokerOffset - consumerOffset = diffTotal, 而diffTotal就是指堆积量,而描述堆积量的指标是消息条数。
从commitlog中来看,由于存储了大量的消息文件,并且消息消费是非顺序消费,继而很难从commitlog中看出哪个
哪个consumer堆积量。
那么哪里可以描述清楚消息条数呢?先来深入了解Consumer Queue
的设计
ConsumerQueue
consumerqueue
的设计以topic
作为逻辑分区,每个topic
下分多个消息队列进行,具体多少消息队列存储参照broker
的配置参数,队列名称以数组0开始,比如配置0,1,2,3 四个消息队列。
配置参数请参考BrokerConfig
,其中有一个参数private int defaultTopicQueueNums = 8;
从语义上理解,堆积量应该指未被消费的存在
broker
上的消息数量,这是基本认知。
commitlog
存储着broker
上所有的消息,设想一下如果每次要查询消息并消费需要从该文件遍历查询,性能之差可想
而知,为了提高查询的消息,优先想到的是诸如MySQL
上的索引设计。同理,consumerqueue
的设计之初就是为了
快速定位到对应的消费者可以消费的消息,当然RocketMQ
也提供了indexfile
,俗称索引文件,主要是解决通过key
快速定位消息的方式。
consumerqueue 消息结构
在consumerqueue
的结构设计,在consumequeue
的条目设计是固定的,并且它整好对应一条消息。consumerqueue
单个文件默认是30w个条目,单个文件长度30w * 20字节。从文件的存储模型可以看出,consumerqueue
存储维度是topic
,并非是consumer
。那么如何找到consumer
的堆积量?
假设
假设一个topic
对应一个consumer
,topic
的堆积量即consumer
的堆积量。从这个维度来推理,前文提到部分consumer
是几乎没有消息,但是却提示消息堆积即合理,因为堆积的消息并非是该consumer
的需要消费的消息,而是该consumerqueue
对应的topic
的堆积
论证过程
从rocketmq console
后台看到的消费者的堆积数量,看到AdminBrokerProcess#getConsumeStats()
。
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
// ...
for (String topic : topics) {
// ...
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setQueueId(i);
OffsetWrapper offsetWrapper = new OffsetWrapper();
// 核心的问题在于要确定brokerOffset 以及consumerOffset的语义
long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (brokerOffset < 0)
brokerOffset = 0;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(),
topic,
i);
if (consumerOffset < 0)
consumerOffset = 0;
// ....
}
// 队列最大索引
public long getMaxOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
long offset = logic.getMaxOffsetInQueue();
return offset;
}
return 0;
}
public long getMaxOffsetInQueue() {
return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
// 总的逻辑偏移量 / 20 = 总的消息条数
}
public static final int CQ_STORE_UNIT_SIZE = 20;// 前文提到每个条目固定20个字节
// 当前消费者的消费进度
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),topic,i);
if (consumerOffset < 0)
consumerOffset = 0;
public long queryOffset(final String group, final String topic, final int queueId) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);// 从offsetTable中读取
if (null != map) {
Long offset = map.get(queueId);
if (offset != null)
return offset;
}
return -1;
}
核心的问题在于从offset缓存中读取出来的,那么offset的数据 又是哪里来的?
// 通过IDE快速可以很快找到如下代码
@Override
public String configFilePath() {
return
BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().
getStorePathRootDir());
}
@Override
public void decode(String jsonString) {
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString,ConsumerOffsetManager.class);
if (obj != null) {
this.offsetTable = obj.offsetTable;
}
}
}
public static String getConsumerOffsetPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
}
也就是说offset的数据是从json文件中加载进来的。
这个文件描述的是topic与消费者的关系,每一个队列对应的消费进度。但是消费是实时更新的,所以必须实时更新消费进度,消费进度的更新是从消息的拉取得到的。
DefaultStoreMessage
前文看过该类的部分代码,主要是拉取的部分,这里补充拉取时的offset的值得语义。
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
// ...
// offsetPy 是commitlog的逻辑偏移量
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
// 消息过滤
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
// ....
}
// ...
//
// 计算下一次开始的offset,是前文的offset
// i 是ConsumeQueue.CQ_STORE_UNIT_SIZE的倍数
// ConsumeQueue.CQ_STORE_UNIT_SIZE是每一条consumerqueue中的条目的大小,20字节
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
看到此处,可以明确消费者拉取消息时的nextBeginOffset
就是consumerqueue
的偏移量/20,意味着类似下标数组index
。
到此处还要再确认拉取的这个消费进度是不是会更新到到offsetTable
?核心看RemoteBrokerOffsetStore
类
消息消费
贴几张图简单了解客户端上报消费进度的过程
至此,可以看到堆积量的实际是根据topic来算,按照前文最开始的假设推断其实是成立的,那么现在那些没有消息堆积的消息为何还会显示堆积就可以理解了。
总结
消息消费属于服务端过滤模式,不过其实还要其他的消息过滤模式
,只是本文并未提及(Class
)。但是由于topic
使用的不合理导致消息可能存在拉取不到数据,但是ONS是计算收费的。同时消息的堆积意义明朗,那么使用RocketMQ
的姿势也就不言而喻,按照业务合理使用topic
以及tag
等。
参考资料
源码:https://github.com/apache/rocketmq
官网:http://rocketmq.apache.org/docs/rmq-deployment/
书籍:《RocketMQ技术内幕》,特别推荐该书,让你对RocketMQ
的架构设计,代码有更深的了解