系列
- RocketMq 消息Tag过滤
- RocketMq 广播模式
- RocketMQ 同步调用的新特性
- RocketMq 事务消息
- RocketMq 堆积查询
- RocketMq 消息查询
- RocketMq 消费位点上报
开篇
这篇文章主要解析下RocketMq的Consumer侧的位移上报,每个consumer会周期性上报该consumer负责的topic的MessageQueue的消费位移。
consumer负责在消费位移的时候会保存消费位移在consumer侧。
consumer周期性的上报消费位移到broker当中。
consumer持久化消费进度
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
// 省略消费消息的逻辑
// 持久化当前消费进度
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
}
- consumer在处理消费逻辑后持久化当前消费进度,持久化的维度为MessageQueue。
- 持久化指定MessageQueue的消费位移。
public class RemoteBrokerOffsetStore implements OffsetStore {
private final static InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String groupName;
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
}
- 通过AtomicLong保存消费位移,通过cms保证原子性。
定时持久化任务
public class MQClientInstance {
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
impl.persistConsumerOffset();
}
}
}
- startScheduledTask每隔10s持久化,通过persistAllConsumerOffset实现。
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
//ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
@Override
public void persistConsumerOffset() {
try {
this.isRunning();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}
}
- DefaultMQPullConsumerImpl持久化该client负责的MessageQueue。
public class RemoteBrokerOffsetStore implements OffsetStore {
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
}
}
}
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
log.info("remove unused mq, {}, {}", mq, this.groupName);
}
}
}
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true);
}
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
// UpdateConsumerOffsetRequestHeader设置Topic+groupName+queueId+offset
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
}
- persistAll执行核心的消费位移的持久化。
- updateConsumeOffsetToBroker负责把MessageQueue的位移上报broker。