MQProducer
从类关系中可以看出,MQProducer 有两种实现方式。一个是 DefaultMQProducer,另一个是 TransactionMQProducer。
- DefaultMQProducer: 我们常用的生产者。
- TransactionMQProducer:继承自 DefaultMQProducer,并支持事务消息。
下面我们来分析下 DefaultMQProducer 启动的过程。
启动示例
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("....");
......
producer.start();
......
}catch(Exception e){}
}
}
创建 DefaultMQProducer 实例,然后制定一些参数,调用 start() 方法就开启了生产者。
DefaultMQProducer 参数分析
public class DefaultMQProducer extends ClientConfig implements MQProducer {
//producer 组名
private String producerGroup;
// Topic 名字,默认为“TBW102”
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
// 创建 Topic 默认的4个队列
private volatile int defaultTopicQueueNums = 4;
// 发送消息超时时间
private int sendMsgTimeout = 3000;
// 当发送的消息大于 4K 时,开始压缩消息。
private int compressMsgBodyOverHowmuch = 1024 * 4;
//同步发送消息,发送失败时再尝试发送2次数
private int retryTimesWhenSendFailed = 2;
// 异步发送消息,发送失败时再尝试发送2次数
private int retryTimesWhenSendAsyncFailed = 2;
//发送broker消息存储失败时,是否尝试去试发送其他的broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;
//最大允许发送字节数
private int maxMessageSize = 1024 * 1024 * 4; // 4M
DefaultMQProducer 中定义的类属性
- producerGroup: 生产者组名
- createTopicKey :Topic 名字,默认为“TBW102”
- defaultTopicQueueNums :创建 Topic 默认的4个队列
- sendMsgTimeout :默认发送消息3秒超时
- compressMsgBodyOverHowmuch :当发送的消息大于 4K 时,开始压缩消息。
- retryTimesWhenSendFailed :同步发送消息,发送失败时再尝试发送2次数。
- retryTimesWhenSendAsyncFailed :异步发送消息,发送失败时再尝试发送2次数
- retryAnotherBrokerWhenNotStoreOK :发送broker消息存储失败时,是否尝试去试发送其他的broker
DefaultMQProducer 还有可以设置其他的参数,这里就不说明了。
Producer 启动
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
// 1. 只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer
case CREATE_JUST:
//2. 防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。
this.serviceState = ServiceState.START_FAILED;
// 3. 检查 groupName 是否合法
this.checkConfig();
//4. 判断是否需要设置 InstanceName 。
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 5. 构建 MQClientInstance 对象。
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 6. 将 DefaultMQProducerImpl 对象注册到 ConcurrentHashMap<String/* group */, MQProducerInner> producerTable 中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 7.以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 8. 启动 第五步创建的 MQClientInstance 实例。
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 9. 设置DefaultMQProducerImpl的ServiceState为RUNNING
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 10. 向所有的 broker 发送心跳和上传 FilterClass
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
- 启动Producer的时候判断 serviceState 的当前状态,只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer。否则抛出异常信息。
2、同时防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。
3、 检查 groupName 是否合法。比如不能为空,是否符合正则 ^[%|a-zA-Z0-9_-]+$
,并且最大长度不能超过 255(CHARACTER_MAX_LENGTH = 255
);
groupName 也不能等于 DEFAULT_PRODUCER
。只要满足上面条件,则抛异常信息。
4、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"
,然后调用 changeInstanceNameToPID() 方法判断名字不是 "DEFAULT" 则更改 instanceName。
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
public static int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format: "pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
.....
}
5、构建 MQClientInstance 对象。
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
- 5.1 首先生成 clientId:ip@instanceName 或 ip@instanceName@unitName
- 5.2 如果 factoryTable 中是不已经存在 MQClientInstance 实例,则创建。 (下面有单独分析该源码)
6、将 DefaultMQProducerImpl 对象注册到 ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
中
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
7、以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中
8、调用 第五步创建的 MQClientInstance 实例 的start() 方法。
该方法做了很多事情:
- 获取NameServer地址
- 启动 Netty 客户端服务
- 设置多个定时任务
- 开启 pullMessageService 服务
- 开启 rebalanceService 服务
- 开启 发送消息服务
下面有具体代码分析MQClientInstance.start() 方法。
9、设置DefaultMQProducerImpl的ServiceState为RUNNING
10、向所有的 broker 发送心跳和上传 FilterClass
创建MQClientInstance实例(第5.2步)
上面 5.2 步骤中创建MQClientInstance 的代码如下:
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
// Netty 中注册接收请求的处理器。
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//设置 NameServer 地址。
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
// 客户端ID
this.clientId = clientId;
//创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
this.mQAdminImpl = new MQAdminImpl(this);
// 创建 pullMessageService 服务
this.pullMessageService = new PullMessageService(this);
// 创建 rebalanceService 服务
this.rebalanceService = new RebalanceService(this);
// 创建 DefaultMQProducer 服务
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
// 开启 Comsumer 统计服务
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
主要功能:
- 创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
- 创建 pullMessageService 服务
- 创建 rebalanceService 服务,供 Consumer 端使用
- 创建 DefaultMQProducer 服务,
- 开启 Comsumer 统计服务。统计最近一段时间内,消费成功个数、消费失败个数等信息。
启动MQClientInstance 服务 (第8步)
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1. 如果配置NameServer地址,则从默认服务器地址中获取(该地址不可改变)
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 2. 启动 Netty 客户端服务
this.mQClientAPIImpl.start();
// 3. 设置多个定时任务
this.startScheduledTask();
// 4. 开启 pullMessageService 服务
this.pullMessageService.start();
// 5. 开启 rebalanceService 服务
this.rebalanceService.start();
// 6. 开启 发送消息服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
主要这几步骤操作。
- 1、获取NameServer地址
如果启动 Producer 时没有指定 NameServer,则程序会向一个Http地址发送请求来获取NameServer地址。通过这种方式可以动态的配置 NameServer。从而达到动态增加和删除NameServer服务。
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
if (wsDomainName.indexOf(":") > 0) {
wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
}
return wsAddr;
}
- 2、启动 Netty 客户端服务
- 3、调用startScheduledTask() 方法设置多个定时任务
- 4、开启 pullMessageService 服务
- 5、开启 rebalanceService 服务
- 6、开启 发送消息服务
startScheduledTask() 方法:定时任务
private void startScheduledTask() {
// 1.如果 NameServer 地址默认没配置,则定时向一个Http地址获取
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 2. 定时的从 NameServer 中获取 Topic、broker、queue 相关信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 3. 定时清理无效的Broker,并向所有的Broker 发送心跳数据
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 4. 定时的持久化 Consumer 端消费每个 queue的 offset 数据。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 5. 调整消费端的线程数
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
1、定时更新 NameServer 地址
每个2分钟,程序会向一个Http地址发送请求来获取NameServer地址来动态更新NameServer地址。2、 定时的从 NameServer 中获取 Topic、broker、queue 相关信息
默认每隔 30秒去 NameServer 中获取Topic、broker、queue等相关信息。
如果有新broker注册或下线,producer端会在30秒之内感知。3、定时清理无效的Broker,并向所有的Broker 发送心跳数据.
默认每隔 30 秒向 Broker 发送心跳数据 和 用户自定义的 filterclass 类。4、定时的持久化 Consumer 端消费每个 queue的 offset 数据。
默认每隔 5 秒持久或 Consumer 消费的 queue 的 offset信息。
持久化分为,远程持久化和本地持久化。
MessageModel.CLUSTERING 模式 queue的offset 保存到 broker上。
BROADCASTING("BROADCASTING") 模式 queue 的 offset 保存在本地。-
5、调整消费端的线程数
每隔 1 分钟计算每一个queue中消息挤压的数量,如果超过100000条,则增加消费线程的并发数,如果小于80000条则减少消费者的线程数。
不过进入源码中看,调整消费者的线程数都注释掉了。