RocketMQ源码解析(八)-Broker#消息发送

从这篇文章开始,主要看一下Broker处理一条消息的完整过程:接收Producer发来的消息->存储消息->将消息推给Consumer。这一篇主要看下接收Producer消息的过程。

消息接口

Broker提供的消息发送的接口有:单条消息、批量消息、RETRY消息。Retry消息即consumer消费失败,要求broker重发的消息。失败的原因有两种,一种是业务端代码处理失败;还有一种是消息在consumer的缓存队列中待的时间超时,consumer会将消息从队列中移除,然后退回给Broker重发。前面在讲Consumer的时候已经讲过了,不再重复。
下面看一下消息到达Broker后的处理过程。

消息处理流程

Producer或者consumer发送消息后,Broker通过SendMessageProcessor做接收和处理。一个消息的包可以只包含了一条消息,也可以包含多条消息。这两种的处理逻辑比较类似,我们只看下单条消息的源码。

SendMessageProcessor

对于单发消息是在Processor的sendMessage()方法中处理的。

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                        final RemotingCommand request,
                                        final SendMessageContext sendMessageContext,
                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

        //1、构建Response的Header
        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        response.setOpaque(request.getOpaque());

        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

        log.debug("receive SendMessage request command, {}", request);
        //2、判断当前时间broker是否提供服务,不提供则返回code为SYSTEM_ERROR的response
        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
        if (this.brokerController.getMessageStore().now() < startTimstamp) {//broker还没开始提供接收消息服务
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
            return response;
        }

        response.setCode(-1);
        //3、检查topic和queue,如果不存在且broker设置中允许自动创建,则自动创建
        super.msgCheck(ctx, requestHeader, response); 
        if (response.getCode() != -1) {
            return response;
        }

        final byte[] body = request.getBody();
        
        int queueIdInt = requestHeader.getQueueId();
        //4、获取topic的配置
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        //5、如果消息中的queueId小于0,则随机选取一个queue
        if (queueIdInt < 0) {
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
        }
        //6、重新封装request中的message成MessageExtBrokerInner
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);
        //7、对于RETRY消息,1)判断是否consumer还存在
        //  2)如果超过最大重发次数,尝试创建DLQ,并将topic设置成DeadQueue,消息将被存入死信队列
        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return response;
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        PutMessageResult putMessageResult = null;
        Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (traFlag != null && Boolean.parseBoolean(traFlag)) {
            //如果是事务消息
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
                return response;
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            //8、调用MessageStore接口存储消息
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        //9、根据putResult设置repsonse状态,更新broker统计信息,成功则回复producer,更新context上下文
        return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
    }
  • 第3步,broker收到消息后可能存在topic不存在的情况,broker可以设置成自动创建topic,不如不支持自动创建,则返回错误。
  • 第7步,对于retry消息,这里会做两件事情:
    首先,会判断consumer是否还存在,因为consumer将消息返回给broker后,会设置一个延时时间,broker有一个定时任务在扫描到重发时间到了以后,会调用processor。所以要check一下调用的时候consumer还在不在。
    再次,消息重发是有次数限制的,默认是16次。这里会检查是否已经超过最大次数,超过的话会把消息的topic设置成并将topic设置成DeadQueue,这样后续处理中就会放入死信队列。
  • 第8步,调用MessageStore接口存储消息,这个我们后面详细讲
  • 第9步,根据存储状态,设置repsonse状态。这里有一点特殊的地方就是如果成功会写response给客户端,如果失败则由外层统一的错误处理逻辑处理。
MessageStore保存消息

调用DefaultMessageStore.putMessage()方法,篇幅原因,一些检查的逻辑代码就不贴了:

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        ...
        //SLAVE不接收消息写入
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        ...
        ...
        //commit log timeout
        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }

        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);

        long eclipseTime = this.getSystemClock().now() - beginTime;
        ...
        //收集消息store时间
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

        if (null == result || !result.isOk()) {
            //记录失败次数
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }

首先判断broker是否是master,并且master当前是可写的。然后判断commitLog上次flush的时候是否超时,如果超时则返回OS_PAGECACHE_BUSY的错误。最终调用commitLog.putMessage()方法保存消息。下面看下CommitLog的方法实现

CommitLog保存消息
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // 1、Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // 2、Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            //非事务消息
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                //3、延时投放消息,变更topic
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        //4、获取当前正在写入文件
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        //5、获取写message的锁
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;//记录lock time

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);
            //6、新建一个mapp file如果文件不存在或者文件已经写满
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                //文件创建失败,则返回错误
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            //7、消息写文件
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    //8、如果文件已满,则新建一个文件继续
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            //9、释放第5)步中获取到的锁
            putMessageLock.unlock();
        }

        if (eclipseTimeInLock > 500) {//写消息时间过长
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }
        //unlock已经写满的文件,释放内存锁
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
        //10、flush数据到磁盘,分同步和异步
        handleDiskFlush(result, putMessageResult, msg);
        //11、如果broker设置成SYNC_MASTER,则等SLAVE接收到数据后才返回(接收到数据是指offset延后没有超过制定的字节数)
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }
  • 第3步,判断是否是Consumer发来的Retry消息,如果是则修改topic为 SCHEDULE_TOPIC_XXXX,根据延时时长计算queueId。将原始的topic和queueId放到消息的properties字段中。这样这个消息只会被重发的Schedule任务读到。
  • 第4步,前面讲MessageStore的时候讲过,CommitLog是由连续的MappedFile的列表组成的。在同一时间,只有最后一个MappedFile有写入,因为之前的文件都已经写满了,所以这里是取最后一个。
  • 第6步,如果commitLog是第一次启动,或者文件size已经达到maxSize,则新建一个文件
  • 第7步,将消息内容写入MappedFile,这里会传一个callback的参数,真正的写入是在callback中实现的,后面我们再看这个实现
  • 第8步,在上一步中的appendMessage()接口中,如果文件剩余的空间已经不足以写下这条消息,则会用一个EOF消息补齐文件,然后返回一个EOF错误。在收到这个错误时,会新建一个文件,然后重写一次。
  • 第10步,用户可以使用MessageStoreFlushDiskType参数来控制数据flush到磁盘的方式,如果参数值SYNC_FLUSH,则每次写完消息都会做一次flush,完成才会返回结果。如果是ASYNC_FLUSH,只会唤醒flushCommitLogService,由它异步的去检查是否要做flush。
  • 第9步,Broker的主从数据同步也可以有两种方式,如果是SYNC_MASTER,则Master保存消息后,需要将消息同步给slave后才会返回结果。如果ASYNC_MASTER,这里不会做任何操作,由HAService的后台线程做数据同步。

下面我们看下MappedFile写入消息的实现

MappedFile消息写入
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;
        //1、获取当前的write position
        int currentPos = this.wrotePosition.get();
 
        if (currentPos < this.fileSize) {
            //2、生成buffer切片
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result = null;
            if (messageExt instanceof MessageExtBrokerInner) {
                //3、写单条消息到byteBuffer
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                //3、批量消息到byteBuffer
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            //4、更新write position,到最新值
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

消息写入首先是获取ByteBuffer,从第2步中可以发现有个判断,这里是因为MappedFile写数据到文件有两种实现方式:

  • FileChannel获取直接内存映射,收到消息后,将数据写入到这块内存中,内存和物理文件的数据交互由操作系统负责
  • CommitLog启动的时候初始化一块内存池(通过ByteBuffer申请的堆外内存),消息数据首先写入内存池中,然后后台有个线程定时将内存池中的数据commit到FileChannel中。这种方式只有MessageStore是ASYNC模式时才能开启。代码中if判断writeBuffer不为空的情况就是使用的这种写入方式。

第3步,最终回调的Callback类将数据写入buffer中,消息的序列化也是在callback里面完成的

序列化和保存

这里调用的是DefaultAppendMessageCallback.doAppend()方法:

        public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner) {
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

            //1、 PHY OFFSET ,消息偏移
            long wroteOffset = fileFromOffset + byteBuffer.position();//

            this.resetByteBuffer(hostHolder, 8);//重置host holder buffer
            //2、生成message ID, 前8位是host后8位是wroteOffset,目的是便于使用msgID来查找消息
            String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

            //3、 Record ConsumeQueue information
            keyBuilder.setLength(0);
            keyBuilder.append(msgInner.getTopic());
            keyBuilder.append('-');
            keyBuilder.append(msgInner.getQueueId());
            String key = keyBuilder.toString();
            //4、取得具体Queue的offset,值是当前是Queue里的第几条消息
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                //5、如果是这个queue的第一条消息,需要初始化
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

            //6、 Transaction messages that require special handling
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
            switch (tranType) {
                // Prepared and Rollback message is not consumed, will not enter the
                // consumer queuec
                // 事务消息
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    queueOffset = 0L;
                    break;
                // 非事务消息
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                default:
                    break;
            }

            /**
             * Serialize message
             */
            final byte[] propertiesData =
                msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

            if (propertiesLength > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
            }

            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            final int topicLength = topicData.length;

            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
            //7、计算消息实际存储长度
            final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

            // Exceeds the maximum message
            if (msgLen > this.maxMessageSize) {
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                    + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }

            // Determines whether there is sufficient free space
            //8、 如果空间不足,magic code设置成EOF,然后剩余字节随机,保证所有文件大小都是FileSize
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

            // Initialization of storage space
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(hostHolder, 8);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(hostHolder, 8);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
            //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset); //Queue offset + 1
                    break;
                default:
                    break;
            }
            return result;
        }

doAppend()代码主要就是消息的序列化过程:
第1步,计算消息在CommitLog的offset,之前的文章已经反复提到这个值了,可以看到这个偏移量是一个全局的值,而不是只针对这个文件。这里fileFromOffset就是文件第一条消息的offset,也即是文件名。
第2步,生成message ID,可以看到这个id是由broker的host和offset拼接起来的,所以很容易根据id得到消息存在哪个broker的哪个文件中。根据id查询消息速度非常快。
第3步,消息在queue中的偏移量,这里的偏移量值是消息在queue中的序号,从0开始。比如,queue中第10条消息offset就是9
第8步,如果文件剩余的空间不足以存下这条消息,则剩余空间用一条EOF填充,然后返回EOF错误,前面已经讲过调用保存接口的地方收到这个错误会新起一个文件重新写入。
第10步,这里就是按照commitLog的格式要求拼装写入bytebuffer了。

总结

一条消息从到达Broker到存到文件中的过程就结束了,之前关于消息存储说到的几个点再重复一下:

  1. 所有的消息在存储时都是按顺序存在一起的,不会按topic和queueId做物理隔离
  2. 每条消息存储时都会有一个offset,通过offset是定位到消息位置并获取消息详情的唯一办法,所有的消息查询操作最终都是转化成通过offset查询消息详情
  3. 每条消息存储前都会产生一个Message ID,通过这个id可以快速的得到消息存储的broker和它在CommitLog中的offset
  4. Broker收到消息后的处理线程只负责消息存储,不负责通知consumer或者其它逻辑,最大化消息吞吐量
  5. Broker返回成功不代表消息已经写入磁盘,如果对消息的可靠性要求高的话,可以将FlushDiskType设置成SYNC_FLUSH,这样每次收到消息写入文件后都会做flush操作。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,383评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,522评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,852评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,621评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,741评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,929评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,076评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,803评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,265评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,582评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,716评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,395评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,039评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,027评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,488评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,612评论 2 350