RocketMQ第三讲

RocketMQ第三讲

一条消息的生命周期

源码导读

逻辑部署结构

image.png

核心package:

rocketmq-broker:mq的核心,它能接收producer和consumer的请求,并调用store层服务对消息进行处理。HA服务的基本单元,支持同步双写,异步双写等模式。
rocketmq-client:mq客户端实现,目前官方仅仅开源了java版本的mq客户端,c++,go客户端有社区开源贡献。
rocketmq-common:一些模块间通用的功能类,比如一些配置文件、常量。
rocketmq-example:官方提供的例子,对典型的功能比如order message,push consumer,pull consumer的用法进行了示范。
rocketmq-filter:消息过滤服务,相当于在broker和consumer中间加入了一个filter代理。rocketmq-namesrv:命名服务,更新和路由发现 broker服务。
rocketmq-remoting:基于netty的底层通信实现,所有服务间的交互都基于此模块。
rocketmq-srvutil:解析命令行的工具类ServerUtil。
rocketmq-store:存储层实现,同时包括了索引服务,高可用HA服务实现。
rocketmq-tools:mq集群管理工具,提供了消息查询等功能。

RocketMQ主要的功能集中在rocketmq-broker、rocketmq-remoting、rocketmq-store 三个模块。

包依赖关系

namesrv依赖rocketmq-client,rocketmq-client依赖rocketmq-common,rocketmq-common依赖rocketmq-remoting。example依赖rocketmq-client和rocketmq-srvutil

重要接口

  • NettyRequestProcessor
  • RemotingService
  • ConsumeMessageService,消费者启动时用

生产者和消费者都用到了RemotingService的子接口,但都用的是RemotingClient接口,NameSrv和Broker用了RemotingServer接口

// 这个接口非常重要
public interface NettyRequestProcessor {
    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws Exception;

    boolean rejectRequest();
}
// 接口的实现的构造函数就很重要,会开启线程池和定时任务
public interface ConsumeMessageService {
    void start();
    void shutdown();
    void updateCorePoolSize(int corePoolSize);
    void incCorePoolSize();
    void decCorePoolSize();
    int getCorePoolSize();
    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
    void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume);
}

重要类

RemotingCommand:在remoting模块中,相当于request和response,非常重要
ResponseCode和RequestCode:在common模块中,common和remoting模块都有协议包
RouteInfoManager:在namesrv模块中,非常重要,保存所有数据DefaultMQPruducer,DefaultMQPruducerImpl,MQClientAPIImpl,MQClientInstance,DefaultMQPushConsumer 在client模块中,他们的关系如下:

MQClientInstance mQClientFactory = MQClientManager.getInstance()
                                                    .getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

备注:MQClientInstance里面有MQAdminImpl和MQClientAPIImpl


image.png

MQAdminImpl里面有MQClientInstance,MQAdminImpl只在MQClientInstance。


image.png

MQClientAPIImpl有RemotingClient,ClientRemotingProcessor


image.png

remoting模块中的重要类

NettyRemotingAbstract,processRequestCommand方法根据ChannelHandlerContext ctx和RemotingCommand选择适当的processRequest处理请求


image.png

重要接口和类图

image.png
image.png
image.png
image.png

启动Nameser

启动

初始化NamesrvConfig和NettyServerConfig,NameServerController对象的initialize和start方法,启动netty服务。RouteInfoManager是一个重要类,DefaultRequestProcessor负责处理netty的请求,根据request.getCode()执行不同的逻辑,管理RouteInfoManager中的数据。

public class DefaultRequestProcessor implements NettyRequestProcessor

启动流程

  1. 解析配置文件,填充NameServerConfig、NettyServerConfig属性值。
  2. 根据启动属性创建NameSrvController实例,并初始化,是NameServer的核心控制器。

启动之后

提供路由功能,存储路由元数据、路由注册与发行机制。

生产者Producer

  • 单线程发送
  • 发出前,最后包装成RemotingCommand,

最后调用MQClientAPIImpl#sendMessageAsync()方法,然后调用RemotingClient的方法进行发送

DefaultMQPruducer里有DefaultMQPruducerImpl,里面的方法sendKernelImpl和属性mQClientFactory
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl中有org.apache.rocketmq.client.impl.factory.MQClientInstance

消息生产者启动流程

  • 检查productGroup是否符合要求
  • 创建MQClientInstance实例
  • 向MQClientInstance注册
  • 启动MQClientInstance,讲解消息消费时详细介绍

消息发送基本流程

业务和技术一起走,因为是单线程,一直往下走就可以。
构建Message,SendMessageRequestHeader,根据Message,SendMessageRequestHeader构建RemotingCommand,remotingClient.invokeSync方法发送RemotingCommand,返回RemotingCommand,MQClientAPIImpl#sendMessageSync,在processSendResponse方法中,通过RemotingCommand构建SendMessageResponseHeader,由SendMessageResponseHeader构建最终要的SendResult。

消息长度验证

查找主题路由信息

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);        // @1
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);          // @2
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {            //@3
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);      //@4
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

选择消息队列

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {   // @1
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();   //@2 start
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);    //@2 end
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {     //@3
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();   //@4
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);     //@5 start
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);                               //@5 end
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);     //@6 
    }

消息发送

发送模式:SYNC,ASYNC,ONEWAY核心入口:DefaultMQProducerImpl#sendKernelImpl方法

  1. 根据MessageQueue获取Broker的网络地址。
image.png
image.png
image.png
image.png
  1. 为消息分配全局唯一ID
  2. 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑
  3. 构建消息发送请求包,主要就是构建SendMessageRequestHeader,主要包含:生产者组,主题名称,默认创建主题Key,该主题在单个Broker默认队列数,队列ID(队列序号),消息系统标记(MessageSysFlag),消息发送时间,消息标记,消息扩展属性,消息重试次数,是否是批量消息等。
  4. 根据消息发送方式,同步、异步、单向方式进行网络传输

把SendMessageRequestHeader包装成RemotingCommand

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response);
  1. 如果注册了消息发送钩子函数,执行after逻辑

同步发送

  • 检查消息发送是否合理
  • 如果消息重试次数超过允许的最大重试次数,消息进入延迟队列
  • broker端调用DefaultMessageStore#putMessage进行消息存储

发送流程图

[图片上传失败...(image-4760af-1566988389351)]

发送结果处理

// 在MQClientAPIImpl中,把返回结果RemotingCommand转成SendResult
private SendResult processSendResponse(
        final String brokerName,
        final Message msg,
        final RemotingCommand response
    ) 

消费者Consumer

多线程消费,会启动线程池

MQClientInstance启动

对于生产者和消费者,启动netty的客户端服务,对于NameSrv和Broker,则启动客户端和服务端两种服务

    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();
    private final ClientConfig clientConfig;
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    private final NettyClientConfig nettyClientConfig;
    private final MQClientAPIImpl mQClientAPIImpl;
    private final MQAdminImpl mQAdminImpl;
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    private final ClientRemotingProcessor clientRemotingProcessor;
    private final PullMessageService pullMessageService;
    private final RebalanceService rebalanceService;
    private final DefaultMQProducer defaultMQProducer;
    private final ConsumerStatsManager consumerStatsManager;
    private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private DatagramSocket datagramSocket;
    private Random random = new Random();
    
    public void  start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

启动之后

消费者启动之后,在做什么?除了netty服务外,有多少个线程,多少个线程池?都在干些什么?朝着这个方向梳理。

消息存储broker

在SendMessageProcessor的processRequest方法中处理RemotingCommand,解析SendMessageRequestHeader,构建MessageExtBrokerInner,调用brokerController的DefaultMessageStore的putMessage方法,然后开始真正的存储。返回存储结果PutMessageResult,然后handlePutMessageResult方法把PutMessageResult转换成RemotingCommand,返回给客户端。

服务启动

启动类:org.apache.rocketmq.broker.BrokerController这个类是服务启动时执行,初始化了发送消息、消费消息、清理过期请求等各种线程池和监听事件发送的消息到达broker,调用 org.apache.rocketmq.broker.processor.SendMessageProcessor 类的processRequest()方法,processRequest()调用sendMessag(),也就是netty收到消息后转发给brokerController,brokerController调用DefaultMessageStore进行存储。

启动之后

启动之后在做什么?
性能优化方面,可以从哪些方面着手调节?
总共有几个线程池,几个线程,线程可以分成几类?

流程图

[图片上传失败...(image-59c8c3-1566988389351)]

技术方面

除了netty服务外,有多少个线程,多少个线程池?都在干些什么?朝着这个方向梳理。

Store存储层

存储层时rocketmq的重中之重,也是决定mq性能的主要之处,这里我们将详细分析

存储层整体结构和源码

[图片上传失败...(image-2c0c44-1566988389351)]

消息存储流程

DefaultMessageStore#putMessage

  • 如果当前Broker停止工作或Broker为Slaver角色或当前RocketMQ不支持写入,则拒绝消息写入
  • 获取当前可以写入的Commitlog
  • 写入CommitLog前,申请putMessageLock,也就是将消息存储到CommitLog文件中是串行的
  • 设置消息的存储时间
  • 将消息追加到MappedFile中
  • 创建全局唯一消息ID
  • 获取该消息在消息队列的偏移量
  • 根据消息体的长度,主题长度,计算消息的总长度
  • 如果消息长度大于CommitLog的空闲长度,则返回AppendMessageStatus.END_OF_FILE
  • 将消息内容存储到ByteBuffer中,然后创建AppendMessageResult
  • 更新消息队列逻辑偏移量
  • 处理完消息追加逻辑后将释放putMessageLock锁
  • DefaultAppendMessageCallback#doAppend只是将消息追加在内存中

ComitLog写入消息

  • 获取消息类型(事务消息,非事务消息,Commit消息
  • 获取一个MappedFile对象,内存映射的具体实现
  • 加锁
  • MappedFile对象,获取一个可用的MappedFile(如果没有,则创建一个)
  • 通过MappedFile对象写入文件
  • 根据刷盘策略刷盘
  • 主从同步

构建消费队列

构建索引

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容