ConsumeQueue概览
RocketMQ是基于主题订阅模式实现消息消费,消费者关心的是主题Topic下的所有消息,同一主题的消息不连续地存储在commitlog文件中,如果直接从commitlog文件中去遍历查找订阅主题下的消息,效率极其低下,为了适应消息消费的检索需求,设计了消息消费队列文件ConsumeQueue,该文件可以看成是Commitlog关于消息消费的索引文件,consumequeue的一级目录为主题Topic,二级目录为Topic的消息队列。主要是针对每一个Topic建立的索引,方便消费者消费某个主题下的消息。
ConsumeQueue条目
ConsumeQueue的每一条都是一条消息的索引,一共20字节。
单个ConsumeQueue文件默认包含30万个条目,每个条目20byte,单个文件的长度为30W20byte,约5.7M。与ConsumeQueue对等的是CommitLog对象。他们都有自己MappedFileQueue及MappedFile对象,他们都是使用MappedFileQueue和MappedFile对象实现消息字节数组和消息索引字节数组的落盘。ConsumeQueue没有使用AllocateMappedFileService服务来创建MappedFile文件,而是使用了MappedFile的构造方法来创建MappedFile文件。ConsumeQueue每一个文件的名称是以第一个消息条数20byte字节的大小为命名的。
ConsumeQueue异步构造
构建consumequeue、indexFile索引文件,通过一个ReputMessageService异步线程进行处理,构建consumequeue、indexFile索引文件的数据从commitLog的MappedFile中的ByteBuffer中获取,一条消息消息构造一个构建索引服务的DispatchRequest请求,再由ConsumeQueue服务处理DispatchRequest请求构建consumequeue的mappedFile文件。由IndexService处理请求构建indexFile索引文件,然后将各自的文件进行刷盘。
消息消费队列ConsumeQueue索引文件是基于CommitLog文件构建的,当消息生产者提交消息存储在CommitLog的MappedFile文件中,ConsumeQueue需要及时更新,否则消息无非被及时消费,根据消息属性查找消息也会出现较大的延迟。构建ConsumeQueue的数据来源为CommitLog的MappedFile中的ByteBuffer,此时消息未必被Commit、Flush等。获取一定数量的消息后,RocketMQ根据每条消息构造一个DispatchRequest请求,开启一个新的线程处理请求,并构造ConsumeQueue的MappedFile文件,将消息写入MappedFile的FileChannel中,等待异步刷盘操作。
构建过程
DefaultMessageStore是消息存储服务的入口和关键API,包含消息分发构建ConsumeQueue和Index索引文件的ReputMessageService的服务。它会开启一个线程进行实时消息分发和ConsumeQueue和Index索引文件构建。
// CommitLog 消息分发,根据 CommitLog 文件,异步构建 ConsumeQueue、IndexFile 文件
private final ReputMessageService reputMessageService;
// 开启异步构建服务
this.reputMessageService.start();
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
// 异步构建ConsumeQueue、Index服务线程是否停止,一直调用doReput()方法,推送一次构建服务,线程休息1毫秒
while (!this.isStopped()) {
try {
Thread.sleep(1);
// 进行消息ConsumeQueue、Index文件异步构建
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
doReput()实时从CommitLog的MappedFile文件中获取需要构建的消息,然后每条消息包装成一个DispatchRequest,进行消息分发。
/**
* 异步构建ConsumeQueue、Index文件
* doReput()方法在没有需要构建的offset时会停止,但调用它的地方会一直不停的调用doReput()方法,进行再次构建ConsumeQueue
*/
private void doReput() {
// reputFromOffset小于commitlog中mappedFile文件开始的offset,进行reputFromOffset值调整为mappedFile文件的开始offset
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
//无限循环构建,commitlog文件剩余offset需要构建
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
// 开始构建的值
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
//根据需要构建的offset从MappedFile
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
// 开始构建的offset
this.reputFromOffset = result.getStartOffset();
// 一次读取ByteBuffer中一条消息,根据每条消息的大小获取一条消息,然后取下一条消息,构建一个DispatchRequest
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 创造异步构建ConsumeQueue的分发请求
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
// 构建dispatchRequest成功
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
// 重新获取构建的offset偏移量
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
// 构建失败
} else if (!dispatchRequest.isSuccess()) {
// 构建失败,这条数据略过,进行构建位置更新,进行下一条ConsumeQueue条目的构建
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
// 获得需要构建的数据的释放
result.release();
}
// result为null不需要构建
} else {
doNext = false;
}
}
}
CommitLogDispatcherBuildConsumeQueue是构建ConsumeQueue请求的处理类。
/**
* 构建ConsumeQueue文件分发服务
*/
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
// 没有事务、事务提交
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
//处理从commit log 异步构建ConsumeQueue请求
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
putMessagePositionInfo处理具体构建请求,并创建或选择一个ConsumeQueue对象。
// 处理从commit log 异步构建ConsumeQueue请求
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
// ConsumeQueue 处理从commit log 异步构建ConsumeQueue请求
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
// 根据topic和queueId获取ConsumeQueue
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
// 新建ConsumeQueue
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
putMessagePositionInfo()将消息索引信息存放到consumequeue的byteBufferIndex中,并追加到consumequeue的内存映射文件中(本操作只追加并不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘模式。
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
// 将commitlog的偏移量、消息长度、tag hash code存入byteBufferIndex
this.byteBufferIndex.flip();
//一条消息消费索引大小20byte
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
//commitlog的偏移量
this.byteBufferIndex.putLong(offset);
//消息长度
this.byteBufferIndex.putInt(size);
// tag hash code
this.byteBufferIndex.putLong(tagsCode);
//开始存储consumequeue条目的物理偏移量
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
// 通过构造函数获取ConsumeQueue的MappedFile对象,不是预分配的
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
// 如果是第一次创建,赋值一些变量
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
// 并根据consumeQueueOffset计算ConsumeQueue中物理地址,将内容追加到ConsumeQueue的内存映射文件中(本操作只追加并不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset + size;
// 将内容追加到ConsumeQueue的内存映射文件中
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
消息消费查询
AdminBrokerProcessor#getIndexBuffer()根据consumequeue的消息下标,进行消息索引条目的返回。
// 根据consumequeue进行消息消费
SelectMappedBufferResult result = consumeQueue.getIndexBuffer(requestHeader.getIndex());
ConsumeQueue#getIndexBuffer()确定consumequeue的MappedFile,然后从MappedFile中查找索引条目。
/**
* 根据offset通过consumequeue查找消息
* @param startIndex 为查找的offset值
* @return
*/
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
int mappedFileSize = this.mappedFileSize;
// consumequeue物理offset,消息条数*20字节(消息大小)
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
// 确定mappedFile
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
// 根据消息余数偏移量,进行ByteBuffer消息查找
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}
MappedFile#findMappedFileByOffset()方法根据 offset 定位 MappedFile 的算法为 (int)((offset/this.mappedFileSize) - (mappedFile.getFileFromOffset()/this.MappedFileSize)),获取这个 MappedFile 在 mappedFiles 的下标,然后获取 MappedFile 文件。
RocketMQ commitlog 日志文件有定时删除功能,所以 commitlog 文件夹下的文件个数是会发生改变的,所以下标的起始位置也会发生改变,动态确定 offset 所在文件的下标为:总文件的个数 - 现有文件个数 = 这个 offset 所在 MappedFile 文件集合中的下标值。
/**
*
* Finds a mapped file by offset.
*
* @param offset Offset.
* @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
* @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
*/
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
// mappedFile 文件下标
// (offset / this.mappedFileSize) 为这个 offset 所在 mappedFile 文件中的第几个个数,定义为:sum
// (firstMappedFile.getFileFromOffset() / this.mappedFileSize)) 为第一个文件所在的文件个数, 定义为:first
// sum - first 为这个 offset,在现有的 mappedFiles 集合文件的下标。
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
MappedFile#selectMappedBuffer()方法根据数据所在的pos位置,从ByteBuffer中查询数据。
/**
* 获取 mappedBuffer 中的数据
* @param pos mappedBuffer 中的一个位置,必须小于可读数据的位置
* @return
*/
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}