原文//www.greatytc.com/p/6cc25cd42185
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
// 重试队列的Topic,RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
// 省略相关代码
// 首先判断最大的重新消费次数
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
// 当前版本由consumer端的MaxReconsumeTimes指定
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
// 当超过最大的重新消费次数后选择死信队列DLQTopic
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
// 创建消费分组对应的死信队列的Topic
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE,// 死信队列只有写权限
0
);
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
// 重新生成消息体写入到新的topic当中,如果死信队列就重新写入commitLog和consumeQueue
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
}
}