本章节是《rocket mq 底层存储源码分析》系列的最后一章,我们结合【逻辑位移索引】以及【key查询索引】,从低层接口分析如何利用这两类索引,为上层业务接口提供查询业务消息的实现。因此,这里并不涉及Consumer客户端是如何发起拉取消息请求,以及broker端接收请求后,根据客户端的查询条件,查询出指定的业务消息并,最后返回给客户端的 整个流程。该章节只是分析如何通过指定的查询参数,获取指定的业务消息。
在开始之前,我们先回顾一下java 中ArrayList
是如何访问一个指定的元素?相信不少读者可以立刻给出答案,通过index
下标就可以访问了。
那rmq是如何设计底层接口来访问具体的业务消息呢?带着这个疑问,我们来一起分析查询业务消息的底层核心接口:
/**
* @param group 消费客户端所制定的ConsumeGroup
* @param topic
* @param queueId 指定的消费队列
* @param offset 消息的逻辑位移
* @param maxMsgNums 拉取的最大消息数量,PULL模式的客户端由用户自行制定,PUSH模式一般使用默认值32
* @param subscriptionData
* @return
*/
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
final SubscriptionData subscriptionData) {
...
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
//this.mappedFileQueue.getMaxOffset() 获取最大的已commit 的物理offset。
final long maxOffsetPy = this.commitLog.getMaxOffset();
//step1、通过topic和queueId找到指定消费的ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
//逻辑消费队列目前最小的【逻辑位移索引】
minOffset = consumeQueue.getMinOffsetInQueue();
//逻辑消费队列目前最大的【逻辑位移索引】
maxOffset = consumeQueue.getMaxOffsetInQueue();
//nextOffsetCorrection 根据consumer指定消费的offset修正nextOffset
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
}
else {
//step2
//获取该索引位置之后的【逻辑位移索引】字节,这里有可能获取多个 【逻辑位移索引】(20字节一个)
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = 16000;
...
//start rolling get 【逻辑位移索引】
//step3
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//step3.1 读取完整的【逻辑位移索引】内容
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); //业务消息的开始 物理存储位移
int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); //业务消息的实际大小
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); //业务消息的 标识hash值
//已经获取的最大的具体消息的物理位移
maxPhyOffsetPulling = offsetPy;
...
//step3.2这里表明如果当前查询出的具体消息的物理位移落后于已CommitLog的maxOffsetPy即最大已提交到缓存的消息物理位移
//总内存的40%,则表明该消息时在磁盘中
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
//该逻辑判断循环内,该条消息是否达到了批次满的条件
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}
//step3.3
if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (selectResult != null) {
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
} else {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
}
} else {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
}
} //end for
...
//step3.4
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
//记录目前的消费进度差值
long diff = maxOffsetPy - maxPhyOffsetPulling;
//系统总存储的40% accessMessageInMemoryMaxRatio = 40,可以配置
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
//换言之,如果目前的消费进度差值 超过40%,则建议消费者端下次拉取的目标为slave
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
//如果该索引文件找不到相对应的【逻辑位移索引】字节内容,则滚动到下一个索引文件的的开始位置所对应的逻辑位移
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
}
}
} else {
//找不到对应的消费逻辑队列
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
...
//step4记录拉取消息的消耗时间,并返回结果
...
long eclipseTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
接下来我们主要从4个步骤分析整个查询流程::
1、通过topic和queueId找到指定消费的ConsumeQueue
2、获取根据查询位移,通过具体ConsumeQueue实例获取多条【逻辑位移索引】字节内容
3、根据【逻辑位移索引】列表滚动获取业务消息
4、返回查询结果
1、通过topic和queueId找到指定消费的ConsumeQueue。
之前在【rocket mq 底层存储源码分析(4)-索引构建】章节中,已经分析过,如何通过topic即queueId获取对应的ConsumeQueue实例,这里就不在详细展开。
获取ConsumeQueue实例后,先得到该逻辑消费队列目前的可消费范围:
minOffset = consumeQueue.getMinOffsetInQueue()
getMinOffsetInQueue() = this.minLogicOffset / CQ_STORE_UNIT_SIZE
maxOffset = consumeQueue.getMaxOffsetInQueue()
getMaxOffsetInQueue() = this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE
字段minLogicOffset
以及 this.mappedFileQueue.getMaxOffset()分别代表目前的 逻辑消费队列实例中,【逻辑索引位移】所对应的【最小物理位移起始地址】以及【最大物理位移起始地址】 ,通过【逻辑索引位移】的物理位移起始位置除以其大小(CQ_STORE_UNIT_SIZE),即可换算出【下标位移】(例如java中查询 ArrayList中的index)。
接着,通过 查询参数中的offset
(客户端指定的【下标位移】)与 逻辑消费队列可查询范围[minOffset ,maxOffset]
比较,如果offset
不在[minOffset ,maxOffset]
的范围内,则跳过查询业务消息的步骤。否则,在根据所在范围,返回查询结果。例如,如果offset < minOffset
,则设置查询状态为status = GetMessageStatus.OFFSET_TOO_SMALL
, 以及下次可查询的下标位移nextBeginOffset = nextOffsetCorrection(offset, minOffset),其业务含义为,该次查询的消息位移过小,并要求客户端下一次查询下标位移为minOffset
。如果offset
在[minOffset ,maxOffset]
范围内,则代码走到步骤2。
2、获取根据查询位移,通过具体ConsumeQueue实例获取多条【逻辑位移索引】字节内容。
接着步骤1中的, 如果offset
在查询范围[minOffset ,maxOffset]
,接着分析获取多条【逻辑位移索引】字节内容。
对应代码
consumeQueue.getIndexBuffer(offset)
:
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
//300000 * 20 ,即每一个mappedFile存放30万条位置索引消息
int mappedFileSize = this.mappedFileSize;
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}
分析一下该方法,首先是this.mappedFileQueue.findMappedFileByOffset(offset)
想必读者对该方法不陌生,即通过指定的物理位移,找出该物理位移所在的映射文件。
然后,在通过SelectMappedBufferResult bufferConsumeQueue = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize))
,将映射文件大于该物理位移的【逻辑位移索引】字节内容一并查询出来。
3、根据【逻辑位移索引】列表滚动获取业务消息。
在上一步分析中,我们知道bufferConsumeQueue
存放着连续递增的【逻辑位移索引】字节内容,接下来,我们将分析如何使用bufferConsumeQueue
查询出符合条件的业务消息字节。
在for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE)
的for 循环里,滚动读取bufferConsumeQueue
中的【逻辑位移索引】。我们详细分析读取流程:
首先是step3.1,根据【逻辑位移索引】的存储结构,读取完整的【逻辑位移索引】内容,maxPhyOffsetPulling
则记录目前的最大的业务消息的物理位移,主要是用来记录消费进度的。
然后到step3.2,checkInDiskByCommitOffset(offsetPy, maxOffsetPy)
,该方法主要是判断,目前我们所查询的业务消息是否在磁盘中,因为rmq只将部分最新的业务消息放在pagecache,而大部分的消息还是存放在磁盘中。判断的依据是,当前查询出的具体消息的物理位移 与 已CommitLog的maxOffsetPy即最大已提交到缓存的消息物理位移的差值,是否超过总内存的40%,如果超过,则表明该消息在磁盘中。接着,判断循环内,该次读取是否达到了批次满的条件,对应代码
this.isTheBatchFull(...)
:
private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {
if (0 == bufferTotal || 0 == messageTotal) {
return false;
}
if ((messageTotal + 1) >= maxMsgNums) {
return true;
}
if (isInDisk) {
//如果当前已获取的总消息大小大于1024 * 64,则停止获取
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
return true;
}
//MaxTransferCountOnMessageInDisk : 8
if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
return true;
}
} else {
//如果当前已获取的总消息大小大于1024 * 256
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
return true;
}
//MaxTransferBytesOnMessageInMemory:32
if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
return true;
}
}
return false;
}
总结一下下面的判断逻辑:
如果isInDisk为true,说明目前消费进度不到系统总存储的60%,则批次量满的条件为:目前的传输字节量不能大于64k或者总条数不能大于8, 否则,目前的传输字节量不能大于256k或者总条数不能大于32条。
如果该次读取未到达批次满的条件,则继续判断目前的业务想消息的tagsCode
是否符合查询所指定的订阅条件,this.messageFilter.isMessageMatched(...)
:
public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
if (tagsCode == null) {
return true;
}
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode())
return true;
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
总结一下上述判断逻辑,如果该【逻辑位移索引】所对应的tagsCode
或者上层业务调用方没有指定subscriptionData
,则认为这条消息符合订阅条件。否则,则根据上层业务所指定的订阅数据,判断订阅数据是否包含tagsCode
。这里在说明一下,tagsCode
是由生产者方指定,而subscriptionData
则由消费者端指定。
如果消息符合查询条件,即this.messageFilter.isMessageMatched(...)
为true,则通过【逻辑位移索引】所存储的offsetPy
以及sizePy
字段,查询出具体的业务消息字节,对应代码SelectMappedBufferResult selectResult = this.commitLog.getMessage(...)
,之前有分析过该方法,这里就不在展开分析了。
如果查询子结果selectResult
不为空,则说明该次查询正常,继而,往拉取总结果getResult
添加子结果,getResult.addMessage(selectResult)
,我们跟进该方法:
public void addMessage(final SelectMappedBufferResult mapedBuffer) {
this.messageMapedList.add(mapedBuffer);
this.messageBufferList.add(mapedBuffer.getByteBuffer());
this.bufferTotalSize += mapedBuffer.getSize();
this.msgCount4Commercial += (int) Math.ceil(
mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
}
可以看出,每次添加子结果后,会往结果列表messageMapedList
添加业务消息字节,而消费者客户端所得到的所有消息元数据就在messageMapedList
中,并且由客户端负责反序列化业务消息;以及记录当已获取消息的总大小bufferTotalSize
和总条数msgCount4Commercial
。这些结果就是作用与下一次 批次满判断的 条件,就是步骤step3.2中的逻辑判断。
如果查询子结果selectResult
为空,说明该【逻辑位移索引】所对应的【业务消息】不在该映射文件中,需要滚动到下一个存储映射文件。
当拉取的消息达到了批次满判断的条件或者是到达了【逻辑位移索引】所在的映射文件尾部时,则结束该次的循环流程。
for循环结束后,接着step3.4,更新nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)
,在说一下nextBeginOffset
,该字段所代表的业务含义是,希望消费者客户端下一次拉取消息下标索引位置。
并且判断消费者客户端拉取消息的broker角色,如果目前的消费进度差值 超过40%,则建议消费者端下次拉取的目标为slave,否则继续向master拉取,这部分内容我们会在分析消费者客户端拉取消息时,详细分析的,这里就不在展开讨论。
4、返回查询结果。
最后一步step4比较简单,往结果getResult
填充相对应的查询结果字段。
总结一下上述查找流程,首先,根据查询条件topic 与 queueId 确定该broker唯一一条逻辑消费队列实例(ConsumeQueue
),然后,获取该消费队列目前可以消费的索引范围[minOffset ,maxOffset]。换言之,业务调用方所指定的请求查询参数中的索引字段offset
一定要在[minOffset ,maxOffset]这个范围内,如果不在该范围内,则会给业务调用方返回相对应的响应码。例如offset
比最小的可消费索引minOffset
还要小,则返回响应码GetMessageStatus.OFFSET_OVERFLOW_ONE
,以及下一次查询的开始索引位置nextBeginOffset = minOffset
。如果offset
在指定的范围内,则根据offset
,定位出offset
第一个【逻辑位移索引】所在的存储映射文件及所在物理位移,然后在根据业务方所指定的最大拉取数量maxMsgNums
(当然,实际值还要结合目前的消费进度及可传输的实际总字节量来确定maxMsgNums
的最终值),结合筛选条件subscriptionData
与tagsCode
匹配,如果符合条件,在根据【逻辑位移索引】存储的的业务消息物理位移offsetPy
及其大小sizePy
,查询出具体的业务消息字节内容,将业务消息字节结果加入结果列表中,知道结果列表到达maxMsgNums
或者是【逻辑位移索引】存储文件尾部,则结束查询。
到这里,《rocket mq 底层存储源码分析》系列就结束了。