上篇文章中我们可以了解到NameServer需要等Broker失效至少120s才能将该Broker从路由表中移除,那如果在Broker故障期间,消息生产者Producer根据获取到的路由信息可能包含已经宕机的Broker,会导致消息发送失败,在接下来的消息发送阶段会解决这个问题。
初识消息有关类
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
# 消息所属主题
private String topic;
# 消息Flag
private int flag;
# 扩展属性
private Map<String, String> properties;
# 消息体
private byte[] body;
private String transactionId;
其中,Message扩展属性主要包括下面几个:
- tag : 消息Tag,用于消息过滤
- keys :Message索引建,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息
- waitStoreMsgOK : 消息发送时是否等到消息存储完成后再返回
- delayTimeLevel : 消息延迟级别,用于定时消息或消息重试
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Wrapping internal implementations for virtually all methods presented in this class. */
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
# 生产者所属组,消息服务器在回查事务状态时会随机选择该组中的任何一个生产者发起事务回查请求
private String producerGroup;
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
# 默认主题在每一个Broker队列的数量
private volatile int defaultTopicQueueNums = 4;
# 发送消息默认超时时间,默认3秒
private int sendMsgTimeout = 3000;
# 消息体超过该值则启用压缩,默认4K
private int compressMsgBodyOverHowmuch = 1024 * 4;
# 同步方式发送消息重试次数,默认为2,总共执行3次
private int retryTimesWhenSendFailed = 2;
# 异步方式发送消息重试次数,默认为2
private int retryTimesWhenSendAsyncFailed = 2;
# 允许发送的最大消息长度
private int maxMessageSize = 1024 * 1024 * 4; // 4M
private TraceDispatcher traceDispatcher = null;
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
private String topic;
private String brokerName;
private int queueId;
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
发送信息总体过程
- 发送消息的入口 DefaultMQProducerImpl#send() ,默认消息发送以同步方式发送,默认超时时间为3s。
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
try {
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
} catch (MQBrokerException e) {
throw new MQClientException("unknownn exception", e);
}
}
- 调用sendDefaultImpl,形参为下:
private SendResult sendDefaultImpl(//
Message msg, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, //
final long timeout//
)
其中,CommunicationMode表示消息发送的方式,同步、异步和单向。
public enum CommunicationMode {
SYNC,
ASYNC,
ONEWAY,
}
然后会验证服务服务是否可用,消息是否符合规范,具体的验证就不解释了。
然后记录当前时间,后面会判断是否timeout。
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
根据要发送消息的topic,寻找该topic的路由信息。
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
当前类有一个属性,记录所有topic的路由信息和消息队列信息。
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
首先从topicPublishInfoTable中查找该topic的topicPublishInfoTable信息,
如果不存在当前topic的信息或者当前topicPublishInfoTable不可用,则先新创建一个TopicPublishInfo(),并放入到topicPublishInfoTable中,然后向NameServer查询该topic的路由信息,此时会调用MQClientInstance的updateTopicRouteInfoFromNameServer(topic)方法。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 缓存中获取 Topic发布信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 当无或者可用的 Topic信息时,从Namesrv获取一次 并且缓存
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//如果key存在的情况下,在putIfAbsent下不会修改
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); //进行调用获取规则存下来
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
private final Lock lockNamesrv = new ReentrantLock();
MQClientInstance#updateTopicRouteInfoFromNameServer方法,LOCK_TIMEOUT_MILLIS默认是3秒,在此处用到了ReentrantLock.tryLock。
该锁的方法的简单解释:
假如线程A和线程B使用同一个锁Lock,此时线程A首先获取锁Lock.lock(),并且始终持有不释放,如果此时B要去获取锁,调用tryLock(3000, mils),则说明在3秒内如果线程A释放锁,会获取到锁并返回true,否则3秒过后会获取不到锁并返回false。
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
如果isDefault为true,则使用默认主题去查询,如果查询到路由信息,则替换路由信息中读写队列个数为消息生产者默认的队列个数。
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);//获取topic规则
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
}
如果isDefault为false,则使用参数topic去查询,如果未查询到路由信息,则返回false,表示路由信息未变化。
else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
如果路由信息找到,与本地缓存中的路由信息进行对比,判断路由信息是否发生了变化,如果没有发生变化,则直接返回fasle。
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
如果发生了变化,先对topicRouteData进行复制,然后根据获得的topicRouteData信息对brokerAddrTable进行更新,
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
根据topicRouteData中的List<\queueData>转化成topicPublishInfo的List<\MessageQueue>列表。具体的是在topicRouteData2TopicPublishInfo中实现的。
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);//更新topic的PublishInfo
}
}
循环遍历路由信息的QueueData信息,如果队列没有写权限,则继续遍历下一个QueueData,根据brokerName找到brokerData信息,找不到或没有找到Master节点,则遍历下一个QueueData,根据写队列个数,根据topic+序号创建MessageQueue,填充topicPublishInfo的List<\QueueMessage>,此时,完成了消息发送的路由查找。
//topicRouteData转换为TopicPublishInfo
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
//有序
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
}
//无序
else {
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);//按照brokerName升序进行排序的
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);//由于brokerName是排序的,TopicPublishInfo里面的messageQueueList就是有序的了从小到大
}
}
}
info.setOrderTopic(false);
}
return info;
}
获取到路由信息之后,如果该路由信息可用,则先计算尝试的次数,如果发送模式是sync,则是3次,其他情况下是1次。
之后记录上次发送失败的broker名称,在第一次发送的时候,lastBrokerName为null,然后根据消息队列选择策略选择消息队列。
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//发送模式是sync 会有3次其他1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName(); //第一次的确是null 但是如果第二次呢? 所以这里存在的意义
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择一个queue
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//调用sendKernelImpl发送消息 发送消息核心
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
//更新Broker可用信息
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
// 如下异常continue,进行发送消息重试
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
消息队列选择策略。
有两种策略, sendLatencyFaultEnable=false, 默认不启用Broker故障延迟机制。
sendLatencyFaultEnable=true,启用Broker故障延迟机制。
先介绍默认的消息队列选择策略,调用TopicPublishInfo#selectOneMessageQueue
当第一次调用时,lastBrokerName为null,对sendWhichQueue本地线程变量进行加1,并与当前路由表中消息队列个数取模,返回该位置的MessageQueue。
如果该消息发送失败,则可能会进行重试发送,此时,lastBrokerName不是null,会记录上次信息发送失败的BrokerName,之后获取sendWhichQueue本地线程变量进行加1,并与当前路由表中消息队列个数取模,获取该位置上的MessageQueue,如果获取的该信息的BrokerName与上一次发送失败的lastBrokerName不相同,则返回该信息,否则再遍历下一个消息,直到第一个不与lastBrokerName相同的消息返回。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {//第一次进入就是空的
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
接下来讲解启用Broker故障延迟机制的消息选择策略
前面部分和上述介绍的差不多,根据index先获取当前位置的消息,然后判断该消息队列是否可用,通过isAvailable方法判断。如果该消息队列可用,在上次发送,或者这次发送的消息队列的broker姓名与上次发送失败的broker姓名一致,则返回该消息队列。
如果所有的broker都预计不可用,随机选择一个不可用的broker,再从路由信息中选择下一个消息队列,将该消息队列的broker重置为上面随机选择的broker,并重置queueId,并返回该消息队列。但是,如果该随机选择的broker内已经没有要发送的消息队列时,则需要将该broker从latencyFaultTolerance中移除,并利用默认的选择机制选择一个消息队列。
//延迟故障容错,维护每个Broker的发送消息的延迟
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
if (this.sendLatencyFaultEnable) { //发送消息延迟容错开关
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.eror("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
上面部分是根据不同策略选择消息队列的具体解释,获取到消息队列之后,返回到发送消息的最外层方法endDefaultImpl中,执行消息发送步骤。
消息发送
tmpmq是选择获取到的消息队列。
brokersSent是存储消息发送的broker,由上面可知,如果发送方式是同步,则该数组长度为3,其他方式下长度为1。然后记录当前时间,然后执行sendKernelImpl方法进行发送消息。之后,获取发送完之后的时间,执行updateFaultItem方法来更新Broker异常信息,一个broker会对应一个faultItem。
之后,根据消息发送的方式,如果是同步的,如果此次消息没有成功,则可以再进行尝试,如果是异步或者单向,则执行结束。如果期间发送了异常,则会调用updateFaultItem方法来更新Broker异常信息。
接下来详细介绍sendKernelImpl方法和updateFaultItem方法。
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName)
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//调用sendKernelImpl发送消息 发送消息核心
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
//更新Broker可用信息
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
}
DefaultMQProducerImpl.sendKernelImpl
该方法的形参有:
- Message msg : 待发送的消息
- MessageQueue mq : 消息将发送到该消息队列上
- CommunicationMode commuicationMode : 消息发送模式,SYNC、ASYNC、ONEWAy
- SendCallback sendCallback :异步消息回调函数
- TopicPublishInfo topicPublishInfo : 主题路由信息
- long timeout:消息发送超时时间
- 根据MessageQueue获取Broker的网络地址,如果MQClientInstance的brokerAddrTable未缓存该Broker的信息,则从NameServer主动更新一下topic的路由信息,如果路由更新后还是找不到Broker信息,则抛出MQClientException,提示Broker不存在。
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
- 为消息分配全局唯一ID,如果消息体默认超过4K,会对消息体采用zip压缩,并设置消息的系统标记为MessageSysFlag.COMPRESED_FLAG。如果是事务Prepared消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);//设置设置UNIQ_id,所以当看见msgId的时候为什么解析不一样了懂了吧
}
int sysFlag = 0; //又是根据位来进行每位是啥的判断
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);//根据事务属性key获取值看是否是事务消息
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
- 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
- 构建消息发送请求包。主要包含下列重要信息:
- 生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数、队列ID
- 消息系统标记、消息发送时间、消息标记、消息扩展属性、消息重试次数、是否是批量信息。
//构建SendMessageRequestHeader
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
//生成消息时间戳
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
- 按照消息发送方式,同步、异步、单向方式进行网络传输。
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//异步发送消息
brokerAddr, // 1
mq.getBrokerName(), // 2
msg, // 3
requestHeader, // 4
timeout, // 5
communicationMode, // 6
sendCallback, // 7
topicPublishInfo, // 8
this.mQClientFactory, // 9
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
context, //
this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//同步以及广播发送消息
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context); //发送消息后逻辑
}
return sendResult;
- 之后就是按照不同的方式进行发送。在发送之前会检查消息发送是否合理,检查该Broker是否有写权限,该Topic是否可以进行消息发送,在NameServer端存储主题的配置信息,除此之外,开始检查队列,如果队列不合法,返回错误码
- 如果消息重试次数超过允许的最大重试次数,消息将进入到DLD延迟队列。延迟队列主题:%DLQ%+消费组名,
- 调用DefaultMessageStore.putMessage进行消息存储。
DefaultMQProducerImpl.updateFaultItem
由上面可知,在执行消息发送完之后和出现发送异常的时候,会调用该方法对broker进行异常更新。
形参解释:
- brokerName : broker名称
- currentLatency : 本次消息发送延迟时间currentLatency
- isolation : 是否隔离,如果为true,则使用默认时长30s来计算BroKer故障规避时长;如果为false,则使用本次消息发送延迟时间来计算Broker故障规避时长。
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
- 利用computeNotAvailableDuration() 方法计算规避时长。
从latencyMax数组尾部开始查找,找到第一个比currentLatency小的下标,然后从notAvailableDuration数组中获取需要规避的时长。
//延迟级别数组
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用时长数组
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
- 根据broker名称从缓存表faultItemTable中获取FaultItem,如果找到则更新FaultItem,否则创建FaultItem。
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
一个broker对应一个faultItem,记录broker名称、消息发送时长和broker恢复正常时间。
class FaultItem implements Comparable<FaultItem> {
private final String name;
private volatile long currentLatency;
private volatile long startTimestamp;
批量消息发送
批量消息发送就是将同一主题的多条信息一起打包发送到消息服务端,减少网络调用次数。
单挑信息发送时,消息体的内容将保存在body中。批量消息发送,需要将多条消息体的内容采用固定格式存储在body中。
在消息发送端,调用batch方法,将一批消息封装成MessageBatch对象,之后的处理流程与上面的基本一致,只需要将该集合的每一条消息的消息体body聚合成一个byte[]数值,在消息服务端能够从该byte[]数值中正确解析消息即可。