系列
- RocketMq 消息Tag过滤
- RocketMq 广播模式
- RocketMQ 同步调用的新特性
- RocketMq 事务消息
- RocketMq 堆积查询
- RocketMq 消息查询
- RocketMq 消费位点上报
RocketMQ事务消息设计
1、事务消息在一阶段对用户不可见
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。
2、Commit和Rollback操作以及Op消息的引入
- 在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
3、Op消息的存储和对应关系
- RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。
4、Half消息的索引构建
- 在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。
5、如何处理二阶段失败的消息?
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。
值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。
6、整体处理流程
- 事务消息有专门的一个队列RMQ_SYS_TRANS_HALF_TOPIC,所有的prepare消息都先往这里放,当消息收到Commit请求后,就把消息再塞到真实的Topic队列里,供Consumer消费,同时向RMQ_SYS_TRANS_OP_HALF_TOPIC塞一条消息。
事务消息举例
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(10000);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
- TransactionMQProducer#sendMessageInTransaction()发送事务消息。
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
- executeLocalTransaction用于执行本地方法。
- checkLocalTransaction用于检查本地事务。
事务消息源码分析
整体流程
public class DefaultMQProducerImpl implements MQProducerInner {
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 1、在事务消息场景下针对msg的properties进行设定
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
// 2、发送一阶段事务消息
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
// 3、进行本地事务的处理逻辑
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
// 4、发送二阶段事务消息
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
}
- 1、在事务消息场景下针对msg的properties设为PROPERTY_DELAY_TIME_LEVEL。
- 2、发送一阶段事务消息。
- 3、执行本地事务的处理逻辑。
- 4、发送二阶段事务消息。
一阶段事务消息发送
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
// 针对事务消息进行处理
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
}
- TransactionalMessageServiceImpl#prepareMessage发送事务消息。
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}
}
public class TransactionalMessageBridge {
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 备份原来的topic、queueId、sysFlag的事务标识
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 切换到topic为HalfTopic对应的值RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
}
- parseHalfMessageInner替换了topic为RMQ_SYS_TRANS_HALF_TOPIC。
- 一阶段消息存储的consumeQueue的topic变成RMQ_SYS_TRANS_HALF_TOPIC。
二阶段事务提交
public class DefaultMQProducerImpl implements MQProducerInner {
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
// 设置二次事务提交的transactionId和commitLog的物理偏移
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 二次事务提交
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
}
- 二阶段事务提交设置requestHeader的transactionId和commitLog的物理偏移。
- 通过endTransactionOneway来实现二阶段事务的提交。
二阶段事务处理-broker
public class EndTransactionProcessor implements NettyRequestProcessor {
private final BrokerController brokerController;
public EndTransactionProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
// 省略相关代码
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
msgInner.setSysFlag(msgExt.getSysFlag());
TopicFilterType topicFilterType =
(msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
: TopicFilterType.SINGLE_TAG;
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
return msgInner;
}
private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
// 省略相关代码
return response;
}
}
- 根据commitLog的offset查找消息并以OperationResult格式返回。
- 针对TRANSACTION_COMMIT_TYPE类型通过endMessageTransaction重新构建Message对象并替换成real topic后通过sendFinalMessage保存到commitLog和consumeQueue当中。
- 针对TRANSACTION_ROLLBACK_TYPE类型似乎没有做特殊处理。
- 最终都执行deletePrepareMessage来标记事务消息的删除。
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
public boolean deletePrepareMessage(MessageExt msgExt) {
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
return true;
} else {
return false;
}
}
@Override
public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {
return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}
@Override
public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}
private OperationResult getHalfMessageByOffset(long commitLogOffset) {
OperationResult response = new OperationResult();
MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
if (messageExt != null) {
response.setPrepareMessage(messageExt);
response.setResponseCode(ResponseCode.SUCCESS);
} else {
response.setResponseCode(ResponseCode.SYSTEM_ERROR);
response.setResponseRemark("Find prepared transaction message failed");
}
return response;
}
}
- getHalfMessageByOffset负责查找事务消息。
- deletePrepareMessage负责记录二阶段事务的操作记录。
public class TransactionalMessageBridge {
private final ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>();
public boolean putOpMessage(MessageExt messageExt, String opType) {
MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
return addRemoveTagInTransactionOp(messageExt, messageQueue);
}
return true;
}
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
// 创建topic为RMQ_SYS_TRANS_OP_HALF_TOPIC,tags为REMOVETAG的Message
Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
writeOp(message, messageQueue);
return true;
}
private void writeOp(Message message, MessageQueue mq) {
MessageQueue opQueue;
if (opQueueMap.containsKey(mq)) {
opQueue = opQueueMap.get(mq);
} else {
opQueue = getOpQueueByHalf(mq);
MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
if (oldQueue != null) {
opQueue = oldQueue;
}
}
if (opQueue == null) {
opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
}
// 保存至op的ConsumeQueue当中
putMessage(makeOpMessageInner(message, opQueue));
}
private MessageExtBrokerInner makeOpMessageInner(Message message, MessageQueue messageQueue) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(message.getTopic());
msgInner.setBody(message.getBody());
msgInner.setQueueId(messageQueue.getQueueId());
msgInner.setTags(message.getTags());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));
msgInner.setSysFlag(0);
MessageAccessor.setProperties(msgInner, message.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(message.getProperties()));
msgInner.setBornTimestamp(System.currentTimeMillis());
msgInner.setBornHost(this.storeHost);
msgInner.setStoreHost(this.storeHost);
msgInner.setWaitStoreMsgOK(false);
MessageClientIDSetter.setUniqID(msgInner);
return msgInner;
}
}
- 二阶段的op操作日志记录在topic为RMQ_SYS_TRANS_OP_HALF_TOPIC的commitLog和consumeQueue当中。
public class TransactionalMessageCheckService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private BrokerController brokerController;
public TransactionalMessageCheckService(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public String getServiceName() {
return TransactionalMessageCheckService.class.getSimpleName();
}
@Override
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 超过15次的回查事务状态失败后,默认是丢弃此消息
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
}
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
// 获取messageQueue对应的opQueue
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取RMQ_SYS_TRANS_HALF_TOPIC对应的messageQueue的偏移
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 获取RMQ_SYS_TRANS_OP_HALF_TOPIC对应的messageQueue的偏移
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("Check error", e);
}
}
}
- 核心是对于RMQ_SYS_TRANS_HALF_TOPIC和RMQ_SYS_TRANS_OP_HALF_TOPIC的consumeQueue的对比。