RocketMQ第三讲
一条消息的生命周期
源码导读
逻辑部署结构
核心package:
rocketmq-broker:mq的核心,它能接收producer和consumer的请求,并调用store层服务对消息进行处理。HA服务的基本单元,支持同步双写,异步双写等模式。
rocketmq-client:mq客户端实现,目前官方仅仅开源了java版本的mq客户端,c++,go客户端有社区开源贡献。
rocketmq-common:一些模块间通用的功能类,比如一些配置文件、常量。
rocketmq-example:官方提供的例子,对典型的功能比如order message,push consumer,pull consumer的用法进行了示范。
rocketmq-filter:消息过滤服务,相当于在broker和consumer中间加入了一个filter代理。rocketmq-namesrv:命名服务,更新和路由发现 broker服务。
rocketmq-remoting:基于netty的底层通信实现,所有服务间的交互都基于此模块。
rocketmq-srvutil:解析命令行的工具类ServerUtil。
rocketmq-store:存储层实现,同时包括了索引服务,高可用HA服务实现。
rocketmq-tools:mq集群管理工具,提供了消息查询等功能。
RocketMQ主要的功能集中在rocketmq-broker、rocketmq-remoting、rocketmq-store 三个模块。
包依赖关系
namesrv依赖rocketmq-client,rocketmq-client依赖rocketmq-common,rocketmq-common依赖rocketmq-remoting。example依赖rocketmq-client和rocketmq-srvutil
重要接口
- NettyRequestProcessor
- RemotingService
- ConsumeMessageService,消费者启动时用
生产者和消费者都用到了RemotingService的子接口,但都用的是RemotingClient接口,NameSrv和Broker用了RemotingServer接口
// 这个接口非常重要
public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception;
boolean rejectRequest();
}
// 接口的实现的构造函数就很重要,会开启线程池和定时任务
public interface ConsumeMessageService {
void start();
void shutdown();
void updateCorePoolSize(int corePoolSize);
void incCorePoolSize();
void decCorePoolSize();
int getCorePoolSize();
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume);
}
重要类
RemotingCommand:在remoting模块中,相当于request和response,非常重要
ResponseCode和RequestCode:在common模块中,common和remoting模块都有协议包
RouteInfoManager:在namesrv模块中,非常重要,保存所有数据DefaultMQPruducer,DefaultMQPruducerImpl,MQClientAPIImpl,MQClientInstance,DefaultMQPushConsumer 在client模块中,他们的关系如下:
MQClientInstance mQClientFactory = MQClientManager.getInstance()
.getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
备注:MQClientInstance里面有MQAdminImpl和MQClientAPIImpl
MQAdminImpl里面有MQClientInstance,MQAdminImpl只在MQClientInstance。
MQClientAPIImpl有RemotingClient,ClientRemotingProcessor
remoting模块中的重要类
NettyRemotingAbstract,processRequestCommand方法根据ChannelHandlerContext ctx和RemotingCommand选择适当的processRequest处理请求
重要接口和类图
启动Nameser
启动
初始化NamesrvConfig和NettyServerConfig,NameServerController对象的initialize和start方法,启动netty服务。RouteInfoManager是一个重要类,DefaultRequestProcessor负责处理netty的请求,根据request.getCode()执行不同的逻辑,管理RouteInfoManager中的数据。
public class DefaultRequestProcessor implements NettyRequestProcessor
启动流程
- 解析配置文件,填充NameServerConfig、NettyServerConfig属性值。
- 根据启动属性创建NameSrvController实例,并初始化,是NameServer的核心控制器。
启动之后
提供路由功能,存储路由元数据、路由注册与发行机制。
生产者Producer
- 单线程发送
- 发出前,最后包装成RemotingCommand,
最后调用MQClientAPIImpl#sendMessageAsync()方法,然后调用RemotingClient的方法进行发送
DefaultMQPruducer里有DefaultMQPruducerImpl,里面的方法sendKernelImpl和属性mQClientFactory
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl中有org.apache.rocketmq.client.impl.factory.MQClientInstance
消息生产者启动流程
- 检查productGroup是否符合要求
- 创建MQClientInstance实例
- 向MQClientInstance注册
- 启动MQClientInstance,讲解消息消费时详细介绍
消息发送基本流程
业务和技术一起走,因为是单线程,一直往下走就可以。
构建Message,SendMessageRequestHeader,根据Message,SendMessageRequestHeader构建RemotingCommand,remotingClient.invokeSync方法发送RemotingCommand,返回RemotingCommand,MQClientAPIImpl#sendMessageSync,在processSendResponse方法中,通过RemotingCommand构建SendMessageResponseHeader,由SendMessageResponseHeader构建最终要的SendResult。
消息长度验证
查找主题路由信息
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); // @1
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); // @2
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { //@3
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); //@4
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
选择消息队列
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) { // @1
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement(); //@2 start
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //@2 end
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { //@3
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); //@4
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); //@5 start
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker); //@5 end
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName); //@6
}
消息发送
发送模式:SYNC,ASYNC,ONEWAY核心入口:DefaultMQProducerImpl#sendKernelImpl方法
- 根据MessageQueue获取Broker的网络地址。
- 为消息分配全局唯一ID
- 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑
- 构建消息发送请求包,主要就是构建SendMessageRequestHeader,主要包含:生产者组,主题名称,默认创建主题Key,该主题在单个Broker默认队列数,队列ID(队列序号),消息系统标记(MessageSysFlag),消息发送时间,消息标记,消息扩展属性,消息重试次数,是否是批量消息等。
- 根据消息发送方式,同步、异步、单向方式进行网络传输
把SendMessageRequestHeader包装成RemotingCommand
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
- 如果注册了消息发送钩子函数,执行after逻辑
同步发送
- 检查消息发送是否合理
- 如果消息重试次数超过允许的最大重试次数,消息进入延迟队列
- broker端调用DefaultMessageStore#putMessage进行消息存储
发送流程图
[图片上传失败...(image-4760af-1566988389351)]
发送结果处理
// 在MQClientAPIImpl中,把返回结果RemotingCommand转成SendResult
private SendResult processSendResponse(
final String brokerName,
final Message msg,
final RemotingCommand response
)
消费者Consumer
多线程消费,会启动线程池
MQClientInstance启动
对于生产者和消费者,启动netty的客户端服务,对于NameSrv和Broker,则启动客户端和服务端两种服务
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
private final ClientConfig clientConfig;
private final int instanceIndex;
private final String clientId;
private final long bootTimestamp = System.currentTimeMillis();
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
private final NettyClientConfig nettyClientConfig;
private final MQClientAPIImpl mQClientAPIImpl;
private final MQAdminImpl mQAdminImpl;
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryScheduledThread");
}
});
private final ClientRemotingProcessor clientRemotingProcessor;
private final PullMessageService pullMessageService;
private final RebalanceService rebalanceService;
private final DefaultMQProducer defaultMQProducer;
private final ConsumerStatsManager consumerStatsManager;
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
private ServiceState serviceState = ServiceState.CREATE_JUST;
private DatagramSocket datagramSocket;
private Random random = new Random();
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
启动之后
消费者启动之后,在做什么?除了netty服务外,有多少个线程,多少个线程池?都在干些什么?朝着这个方向梳理。
消息存储broker
在SendMessageProcessor的processRequest方法中处理RemotingCommand,解析SendMessageRequestHeader,构建MessageExtBrokerInner,调用brokerController的DefaultMessageStore的putMessage方法,然后开始真正的存储。返回存储结果PutMessageResult,然后handlePutMessageResult方法把PutMessageResult转换成RemotingCommand,返回给客户端。
服务启动
启动类:org.apache.rocketmq.broker.BrokerController这个类是服务启动时执行,初始化了发送消息、消费消息、清理过期请求等各种线程池和监听事件发送的消息到达broker,调用 org.apache.rocketmq.broker.processor.SendMessageProcessor 类的processRequest()方法,processRequest()调用sendMessag(),也就是netty收到消息后转发给brokerController,brokerController调用DefaultMessageStore进行存储。
启动之后
启动之后在做什么?
性能优化方面,可以从哪些方面着手调节?
总共有几个线程池,几个线程,线程可以分成几类?
流程图
[图片上传失败...(image-59c8c3-1566988389351)]
技术方面
除了netty服务外,有多少个线程,多少个线程池?都在干些什么?朝着这个方向梳理。
Store存储层
存储层时rocketmq的重中之重,也是决定mq性能的主要之处,这里我们将详细分析
存储层整体结构和源码
[图片上传失败...(image-2c0c44-1566988389351)]
消息存储流程
DefaultMessageStore#putMessage
- 如果当前Broker停止工作或Broker为Slaver角色或当前RocketMQ不支持写入,则拒绝消息写入
- 获取当前可以写入的Commitlog
- 写入CommitLog前,申请putMessageLock,也就是将消息存储到CommitLog文件中是串行的
- 设置消息的存储时间
- 将消息追加到MappedFile中
- 创建全局唯一消息ID
- 获取该消息在消息队列的偏移量
- 根据消息体的长度,主题长度,计算消息的总长度
- 如果消息长度大于CommitLog的空闲长度,则返回AppendMessageStatus.END_OF_FILE
- 将消息内容存储到ByteBuffer中,然后创建AppendMessageResult
- 更新消息队列逻辑偏移量
- 处理完消息追加逻辑后将释放putMessageLock锁
- DefaultAppendMessageCallback#doAppend只是将消息追加在内存中
ComitLog写入消息
- 获取消息类型(事务消息,非事务消息,Commit消息
- 获取一个MappedFile对象,内存映射的具体实现
- 加锁
- MappedFile对象,获取一个可用的MappedFile(如果没有,则创建一个)
- 通过MappedFile对象写入文件
- 根据刷盘策略刷盘
- 主从同步