RocketMQ源码(二):broker的启动(一)

RocketMQ源码(一):NameServer的启动
RocketMQ源码(三):broker的启动(二)
RocketMQ源码(四):producer的启动
RocketMQ源码(五):producer发送消息
RocketMQ源码(六):broker接收消息
RocketMQ源码(七):consumer的启动
RocketMQ源码(八):consumer消息拉取(一)
RocketMQ源码(九):consumer消息拉取(二)
从RocketMq的整体架构来看,broker就像是一个管家。不过它管理的是消息。包括但不限于:消息接收、消息存储、提供消息的查询等等。
和NameServer一样,broker也可以通过BrokerStartup的main方法来启动,同样的brokercontroller是broker的核心控制类。这里主要通过两个步骤来完成启动:

  • createBrokerController
  • controller.start()
    先从第一步开始看broker的启动过程
public static BrokerController createBrokerController(String[] args) {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    // Socket参数,TCP数据发送缓冲区大小,默认128k
    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
        NettySystemConfig.socketSndbufSize = 131072;
    }
    // Socket参数,TCP数据接收缓冲区大小,默认128k
    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
        NettySystemConfig.socketRcvbufSize = 131072;
    }

    try {
        //PackageConflictDetect.detectFastjson();
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
            new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
        }
        // 用来封装其绝大多数基本配置信息
        final BrokerConfig brokerConfig = new BrokerConfig();
        // 封装了其作为对外暴露的消息队列服务器的信息
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // 封装了其作为NameServer客户端的信息
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();

        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
            String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
        // 设置监听端口号
        nettyServerConfig.setListenPort(10911);
        // MessageStoreConfig会默认配置BrokerRole为ASYNC_MASTER
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
        // 如果是从节点,消息最大内存占比比主节点少 10%
        if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
            int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
            messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
        }
        // -c指令加载配置
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                configFile = file;
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);

                properties2SystemEnv(properties);
                MixAll.properties2Object(properties, brokerConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                MixAll.properties2Object(properties, nettyClientConfig);
                MixAll.properties2Object(properties, messageStoreConfig);

                BrokerPathConfigHelper.setBrokerConfigPath(file);
                in.close();
            }
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

        if (null == brokerConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
        // 检查集群地址
        String namesrvAddr = brokerConfig.getNamesrvAddr();
        if (null != namesrvAddr) {
            try {
                String[] addrArray = namesrvAddr.split(";");
                for (String addr : addrArray) {
                    RemotingUtil.string2SocketAddress(addr);
                }
            } catch (Exception e) {
                System.out.printf(
                    "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                    namesrvAddr);
                System.exit(-3);
            }
        }
        // master的brokerId为0,slave为大于0
        // Broker 角色分为
        // ASYNC_MASTER(异步主机),异步同步消息到slave
        // SYNC_MASTER(同步主机),同步同步消息到slave
        // SLAVE(从机)
        switch (messageStoreConfig.getBrokerRole()) {
            case ASYNC_MASTER:
            case SYNC_MASTER:
                brokerConfig.setBrokerId(MixAll.MASTER_ID);
                break;
            case SLAVE:
                if (brokerConfig.getBrokerId() <= 0) {
                    System.out.printf("Slave's brokerId must be > 0");
                    System.exit(-3);
                }

                break;
            default:
                break;
        }

        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            brokerConfig.setBrokerId(-1);
        }
        // 通过-p和-m命令加载配置
        messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            MixAll.printObjectProperties(console, nettyClientConfig);
            MixAll.printObjectProperties(console, messageStoreConfig);
            System.exit(0);
        } else if (commandLine.hasOption('m')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig, true);
            MixAll.printObjectProperties(console, nettyServerConfig, true);
            MixAll.printObjectProperties(console, nettyClientConfig, true);
            MixAll.printObjectProperties(console, messageStoreConfig, true);
            System.exit(0);
        }

        log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        MixAll.printObjectProperties(log, brokerConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
        MixAll.printObjectProperties(log, nettyClientConfig);
        MixAll.printObjectProperties(log, messageStoreConfig);
        // 实例化BrokerController
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig);
        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);
        // 初始化BrokerController
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        // 注册虚拟机的shutdown钩子函数
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            private volatile boolean hasShutdown = false;
            private AtomicInteger shutdownTimes = new AtomicInteger(0);

            @Override
            public void run() {
                synchronized (this) {
                    log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                    if (!this.hasShutdown) {
                        this.hasShutdown = true;
                        long beginTime = System.currentTimeMillis();
                        controller.shutdown();
                        long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                        log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                    }
                }
            }
        }, "ShutdownHook"));

        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

出了读取配置以外,这里有两个地方比较重要

  • BrokerController实例化
  • controller.initialize(),BrokerController初始化

1.BrokerController的实例化

public BrokerController(
    final BrokerConfig brokerConfig,
    final NettyServerConfig nettyServerConfig,
    final NettyClientConfig nettyClientConfig,
    final MessageStoreConfig messageStoreConfig
) {
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;
    // @1
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    // @2
    this.topicConfigManager = new TopicConfigManager(this);
    // 处理consumer的消息拉取
    this.pullMessageProcessor = new PullMessageProcessor(this);
    // 消息拉取请求挂起服务。当消息拉取请求拉取不到数据时,链接挂起一段时间
    this.pullRequestHoldService = new PullRequestHoldService(this);
    // 消息达到通知,用于通知pullRequestHoldService中挂起的链接
    this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    // consumer信息变更(变更、注册、解除注册)监听
    this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
    // @3
    this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
    // consumer过滤信息管理
    this.consumerFilterManager = new ConsumerFilterManager(this);
    // @4
    this.producerManager = new ProducerManager();
    // 扫描客户端链接是否超时
    this.clientHousekeepingService = new ClientHousekeepingService(this);
    this.broker2Client = new Broker2Client(this);
    this.subscriptionGroupManager = new SubscriptionGroupManager(this);
    // broker向NameSrv发送请求的客户端
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    this.filterServerManager = new FilterServerManager(this);
    // 从节点异步同步主节点数据
    this.slaveSynchronize = new SlaveSynchronize(this);

    this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
    this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
    this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
    this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
    this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
    this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
    this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
    this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
    // broker统计信息
    this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
    this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
    // 快速失败服务,用于broker服务繁忙时,进行服务的限流
    // 快速返回堆积在队列中的请求
    this.brokerFastFailure = new BrokerFastFailure(this);
    this.configuration = new Configuration(
        log,
        BrokerPathConfigHelper.getBrokerConfigPath(),
        this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
    );
}

这里是brokercontroller的各种元素的实例化,主要来认识下4个元素

1.1 ConsumerOffsetManager

private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

ConsumerOffsetManager有一个很重要的属性就是offsetTable,key=topic@group,value=<消息队列ID,offerset>,用于记录topic下每个consumerGroup在每个队列的消费进度

1.2 TopicConfigManager

private final ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(1024);

键就是Topic,值TopicConfig用来记录topic的配置信息(读写队列数、读写权限、消息过滤方式等)

1.3 ConsumerManager

// 保存groupName对应的消费者组信息
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

继续看下ConsumerGroupInfo

  private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();
  private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
  private volatile ConsumeType consumeType;
  private volatile MessageModel messageModel;

subscriptionTable 主要是记录topic和订阅信息
channelInfoTable 记录Consumer的物理连接
ConsumeType是一个枚举,表明两种消费方式:

CONSUME_ACTIVELY("PULL"),
CONSUME_PASSIVELY("PUSH");

MessageModel 代表的是两种消费模式:

BROADCASTING("BROADCASTING"),
CLUSTERING("CLUSTERING");

Broadcasting:同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费
Clustering:同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的

结合着来看,也就是说使用相同GroupName的一组Consumer,其ConsumeType和MessageModel必定相同,其订阅的Topic会根据ConsumeType和MessageModel来完成相应的方式的消息处理

1.4 ProducerManager

private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable = new ConcurrentHashMap<>();

groupChannelTable 记录group name 和物理连接的映射

2. BrokerController.initialize

public boolean initialize() throws CloneNotSupportedException {
    // 文件加载
    // 分别加载:topics.json、consumerOffset.json、
    // subscriptionGroup.json、consumerFilter.json等4个文件到对应的manager对象中
    boolean result = this.topicConfigManager.load();
    result = result && this.consumerOffsetManager.load();
    result = result && this.subscriptionGroupManager.load();
    result = result && this.consumerFilterManager.load();

    if (result) {
        try {
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                    this.brokerConfig);
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
            }
            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
            // 加载messageStore的plugin,在执行messageStore之前先执行plugin
            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
            this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }
    // 文件加载和数据恢复
    result = result && this.messageStore.load();

    if (result) {
        // 初始化netty服务端
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        // 初始化VIP端口服务端,即在remotingServer监听的端口号上-2
        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
        // 线程池-处理从provider发送过来的消息
        this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getSendMessageThreadPoolNums(),
            this.brokerConfig.getSendMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.sendThreadPoolQueue,
            new ThreadFactoryImpl("SendMessageThread_"));
        // 线程池-处理从consumer来的消息拉取请求
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_"));
        // 线程池-处理从consumer来的重试消息
        this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.replyThreadPoolQueue,
            new ThreadFactoryImpl("ProcessReplyMessageThread_"));

        this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.queryThreadPoolQueue,
            new ThreadFactoryImpl("QueryMessageThread_"));

        this.adminBrokerExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                "AdminBrokerThread_"));
        // 线程池-客户端管理,处理解除注册、检查配置事件
        this.clientManageExecutor = new ThreadPoolExecutor(
            this.brokerConfig.getClientManageThreadPoolNums(),
            this.brokerConfig.getClientManageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.clientManagerThreadPoolQueue,
            new ThreadFactoryImpl("ClientManageThread_"));
        // 线程池-处理心跳事件
        this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.heartbeatThreadPoolQueue,
            new ThreadFactoryImpl("HeartbeatThread_", true));

        this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.endTransactionThreadPoolQueue,
            new ThreadFactoryImpl("EndTransactionThread_"));

        this.consumerManageExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                "ConsumerManageThread_"));

        // 把事件类型、事件处理还有对应的线程池保存在processorTable中
        this.registerProcessor();
        final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
        final long period = 1000 * 60 * 60 * 24;
        // 每天凌晨打印昨天发送和拉取的消息数量
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.getBrokerStats().record();
                } catch (Throwable e) {
                    log.error("schedule record error.", e);
                }
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);
        // 定时持久化consumerOffsetManager的内容到consumerOffset.json文件中
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerOffsetManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumerOffset error.", e);
                }
            }
        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        // 定时持久化consumerOffsetManager的内容到consumerFilter.json文件中
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerFilterManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumer filter error.", e);
                }
            }
        }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
        // 定时禁用消费慢的consumer(消息堆积量达到consumerFallbehindThreshold,默认16G),不允许consumer进行消息的拉取,
        // 保护Broker,需要设置disableConsumeIfConsumerReadSlowly属性,默认false
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.protectBroker();
                } catch (Throwable e) {
                    log.error("protectBroker error.", e);
                }
            }
        }, 3, 3, TimeUnit.MINUTES);
        // 定时打印Send、Pull、Query、Transaction队列信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.printWaterMark();
                } catch (Throwable e) {
                    log.error("printWaterMark error.", e);
                }
            }
        }, 10, 1, TimeUnit.SECONDS);
        // 定时打印已存储在提交日志中但尚未调度到消费队列的字节数
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                } catch (Throwable e) {
                    log.error("schedule dispatchBehindBytes error.", e);
                }
            }
        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
        // 如果设置了namesrvaddr的地址,则更新一次
        // 否则定时更新namesrvaddr的地址
        if (this.brokerConfig.getNamesrvAddr() != null) {
            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
            log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
        } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                    } catch (Throwable e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
        // 在非DLeger模式下
        // 从节点根据配置更新主节点地址,主节点打印订阅关系
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }
            } else {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            log.error("schedule printMasterAndSlaveDiff error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;

                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }

                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                            ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }
        // 这里动态加载了TransactionalMessageService和AbstractTransactionalMessageCheckListener的实现类,位于如下
        // “META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService”
        // “META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener”
        // 还创建了TransactionalMessageCheckService
        initialTransaction();
        // 加载"META-INF/service/org.apache.rocketmq.acl.AccessValidator"配置的AccessValidator实体类
        // 然后将其包装成RPC钩子,注册到remotingServer和fastRemotingServer中,用于请求的调用validate方法进行ACL权限检查
        // 创建ACL权限检查
        initialAcl();
        // 加载"META-INF/service/org.apache.rocketmq.remoting.RPCHook"下的配置的实体类
        initialRpcHooks();
    }
    return result;
}

这就是brokerController的初始化过程,接下来分成几部分来拆解这个初始化过程

2.1 DefaultMessageStore的实例化

先来看看DefaultMessageStore的一些元素,以及它们的含义:

// 存储相关的配置,例如存储路径、commitLog文件大小,刷盘频次等等
private final MessageStoreConfig messageStoreConfig;
// comitLog 的核心处理类,消息存储在 commitlog 文件中
private final CommitLog commitLog;
// topic 的队列信息
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
// ConsumeQueue 刷盘服务线程
private final FlushConsumeQueueService flushConsumeQueueService;
// commitLog 过期文件删除线程
private final CleanCommitLogService cleanCommitLogService;
// consumeQueue 过期文件删除线程
private final CleanConsumeQueueService cleanConsumeQueueService;
// 索引服务
private final IndexService indexService;
// MappedFile 分配线程,RocketMQ 使用内存映射处理 commitlog、consumeQueue文件
private final AllocateMappedFileService allocateMappedFileService;
// reput 转发线程(负责 Commitlog 转发到 Consumequeue、Index文件)
private final ReputMessageService reputMessageService;
// 主从同步实现服务
private final HAService haService;
// 定时任务调度器,执行定时任务
private final ScheduleMessageService scheduleMessageService;
// 存储统计服务
private final StoreStatsService storeStatsService;
// ByteBuffer 池,后文会详细使用
private final TransientStorePool transientStorePool;
// 存储服务状态
private final RunningFlags runningFlags = new RunningFlags();
// 获取当前时钟
private final SystemClock systemClock = new SystemClock();

private final ScheduledExecutorService scheduledExecutorService =
    Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
// Broker 统计服务
private final BrokerStatsManager brokerStatsManager;
// 消息达到监听器
private final MessageArrivingListener messageArrivingListener;
private final BrokerConfig brokerConfig;

private volatile boolean shutdown = true;
// 刷盘检测点
private StoreCheckpoint storeCheckpoint;

private AtomicLong printTimes = new AtomicLong(0);
// 转发 comitlog 日志,主要是从 commitlog 转发到 consumeQueue、index 文件
private final LinkedList<CommitLogDispatcher> dispatcherList;

接下来看看它的初始化方法

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
    final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
    this.messageArrivingListener = messageArrivingListener;
    this.brokerConfig = brokerConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.brokerStatsManager = brokerStatsManager;
    // 分配mappedFile服务,异步的创建和获取mappedFile
    this.allocateMappedFileService = new AllocateMappedFileService(this);
    // 存储服务
    if (messageStoreConfig.isEnableDLegerCommitLog()) {
        this.commitLog = new DLedgerCommitLog(this);
    } else {
        this.commitLog = new CommitLog(this);
    }
    // 消费队列信息
    this.consumeQueueTable = new ConcurrentHashMap<>(32);
    // 刷新队列服务,刷新consumerQueue的数据到磁盘上
    this.flushConsumeQueueService = new FlushConsumeQueueService();
    // 清除commitLog数据服务,删除过期的物理文件
    this.cleanCommitLogService = new CleanCommitLogService();
    // 清除消费队列服务,删除consumerQueue中对应不上的逻辑文件(mappedFile)
    this.cleanConsumeQueueService = new CleanConsumeQueueService();
    this.storeStatsService = new StoreStatsService();
    // 索引服务
    this.indexService = new IndexService(this);
    // HA服务,主从复制
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        this.haService = new HAService(this);
    } else {
        this.haService = null;
    }
    // 消息再分发服务
    this.reputMessageService = new ReputMessageService();
    // 主从节点切换服务
    this.scheduleMessageService = new ScheduleMessageService(this);

    this.transientStorePool = new TransientStorePool(messageStoreConfig);

    if (messageStoreConfig.isTransientStorePoolEnable()) {
        this.transientStorePool.init();
    }

    this.allocateMappedFileService.start();

    this.indexService.start();
    // ReputMessageService会调用dispatcherList中的每个分发类
    // CommitLogDispatcherBuildConsumeQueue用于consumerQueue文件的构建
    // CommitLogDispatcherBuildIndex用于index文件的构建
    this.dispatcherList = new LinkedList<>();
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
    // 构造文件锁
    File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
    MappedFile.ensureDirOK(file.getParent());
    lockFile = new RandomAccessFile(file, "rw");
}

这里可以看到DefaultMessageStore会初始化很多的服务,来管理数据的存储或是与磁盘交互。其中的部分服务会在后面的start启动过程中讲到,或者在producer的消息推送过程中讲到。

2.2 this.messageStore.load()

store文件夹目录
public boolean load() {
    boolean result = true;
    try {
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }
        // 加载 commitLog 文件到逻辑结构 mappedFile 中
        result = result && this.commitLog.load();
        // 保存 consumequeue 文件信息到consumeQueueTable中
        result = result && this.loadConsumeQueue();

        if (result) {
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            // 加载 index 文件到逻辑结构 IndexFile 中
            this.indexService.load(lastExitOK);
            // 数据恢复
            // 1.重置consumerQueue和commitLog中每个mappedFile的buffer坐标
            // 2.根据commitLog去掉consumerQueue中的脏数据
            this.recover(lastExitOK);
            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }
    if (!result) {
        this.allocateMappedFileService.shutdown();
    }
    return result;
}

这个方法主要就是将物理地址中的文件,包装成对应的逻辑对象的过程,通过内存映射等原理操作逻辑地址。

2.3 初始化所需服务

回到brokerController的初始化方法中。接下来是初始化netty的连接服务以及各种线程池。

// 初始化netty服务端
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
// 初始化VIP端口服务端
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
// 线程池-处理从provider发送过来的消息
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getSendMessageThreadPoolNums(),
    this.brokerConfig.getSendMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.sendThreadPoolQueue,
    new ThreadFactoryImpl("SendMessageThread_"));
// 线程池-处理从consumer来的消息拉取请求
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getPullMessageThreadPoolNums(),
    this.brokerConfig.getPullMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.pullThreadPoolQueue,
    new ThreadFactoryImpl("PullMessageThread_"));
// 线程池-处理从consumer来的重试消息
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.replyThreadPoolQueue,
    new ThreadFactoryImpl("ProcessReplyMessageThread_"));

this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getQueryMessageThreadPoolNums(),
    this.brokerConfig.getQueryMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.queryThreadPoolQueue,
    new ThreadFactoryImpl("QueryMessageThread_"));

this.adminBrokerExecutor =
    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
        "AdminBrokerThread_"));
// 线程池-客户端管理,处理解除注册、检查配置事件
this.clientManageExecutor = new ThreadPoolExecutor(
    this.brokerConfig.getClientManageThreadPoolNums(),
    this.brokerConfig.getClientManageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.clientManagerThreadPoolQueue,
    new ThreadFactoryImpl("ClientManageThread_"));
// 线程池-处理心跳事件
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getHeartbeatThreadPoolNums(),
    this.brokerConfig.getHeartbeatThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.heartbeatThreadPoolQueue,
    new ThreadFactoryImpl("HeartbeatThread_", true));

this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getEndTransactionThreadPoolNums(),
    this.brokerConfig.getEndTransactionThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.endTransactionThreadPoolQueue,
    new ThreadFactoryImpl("EndTransactionThread_"));

this.consumerManageExecutor =
    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
        "ConsumerManageThread_"));

其中部分线程池在后续文章的分析中会再次见到

2.4 registerProcessor

这里主要是为 remotingServer 和 fastRemotingServer 注册处理SendProcessor、ReplyMessageProcessor、NettyRequestProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor、AdminBrokerProcessor等事件处理器以及事件处理器对应的线程池。
不一样的是PullMessageProcessor只能由 remotingServer 处理。

2.4 注册定时器

// 每天凌晨打印昨天发送和拉取的消息数量
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.getBrokerStats().record();
        } catch (Throwable e) {
            log.error("schedule record error.", e);
        }
    }
}, initialDelay, period, TimeUnit.MILLISECONDS);
// 定时持久化consumerOffsetManager的内容到consumerOffset.json文件中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 定时持久化consumerOffsetManager的内容到consumerFilter.json文件中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerFilterManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumer filter error.", e);
        }
    }
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
// 定时禁用消费慢的consumer(消息堆积量达到consumerFallbehindThreshold,默认16G),不允许consumer进行消息的拉取,
// 保护Broker,需要设置disableConsumeIfConsumerReadSlowly属性,默认false
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.protectBroker();
        } catch (Throwable e) {
            log.error("protectBroker error.", e);
        }
    }
}, 3, 3, TimeUnit.MINUTES);
// 定时打印Send、Pull、Query、Transaction队列信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.printWaterMark();
        } catch (Throwable e) {
            log.error("printWaterMark error.", e);
        }
    }
}, 10, 1, TimeUnit.SECONDS);
// 定时打印已存储在提交日志中但尚未调度到消费队列的字节数
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
        } catch (Throwable e) {
            log.error("schedule dispatchBehindBytes error.", e);
        }
    }
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
// 如果设置了namesrvaddr的地址,则更新一次
// 否则定时更新namesrvaddr的地址
if (this.brokerConfig.getNamesrvAddr() != null) {
    this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
    log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
            } catch (Throwable e) {
                log.error("ScheduledTask fetchNameServerAddr exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 在非DLeger模式下
// 从节点根据配置更新主节点地址,主节点打印订阅关系
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
            this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
            this.updateMasterHAServerAddrPeriodically = false;
        } else {
            this.updateMasterHAServerAddrPeriodically = true;
        }
    } else {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.printMasterAndSlaveDiff();
                } catch (Throwable e) {
                    log.error("schedule printMasterAndSlaveDiff error.", e);
                }
            }
        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    }
}

2.5 注册和加载配置类

// 这里动态加载了TransactionalMessageService和AbstractTransactionalMessageCheckListener的实现类,位于如下
// “META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService”
// “META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener”
// 还创建了TransactionalMessageCheckService
initialTransaction();
// 加载"META-INF/service/org.apache.rocketmq.acl.AccessValidator"配置的AccessValidator实体类
// 然后将其包装成RPC钩子,注册到remotingServer和fastRemotingServer中,用于请求的调用validate方法进行ACL权限检查
// 创建ACL权限检查
initialAcl();
// 加载"META-INF/service/org.apache.rocketmq.remoting.RPCHook"下的配置的实体类
initialRpcHooks();

至此BrokerController的创建和初始化过程就结束了,接下来是BrokerController的启动过程。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容