RocketMQ源码解读——同一消费组下不同消费者订阅关系不同时的消费问题

RocketMQ源码解读——同一消费组下不同消费者订阅关系不同时
@(rocketmq源码解读)

先解释一下题目,我们假设有一个Producer和两个Consumer,Producer向TOPICA和TOPICB发送消息,两个Consumer分别订阅两个topic。我们看下这时候会出现的问题,以及根据源码分析一下为什么出现问题。

现象
现象其实还是比较隐蔽的,broker上会打印:the consumer's subscription not exist,group ...的日志(Consumer端也会打印类似的日志)。

还会有一些subscription changed, group: ...类似的日志,并且如果仔细的话还会发现,其中一个消费者消费消息时,另外一个就不会消费。

源码分析
我们看一下为什么会导致这样的问题,一开始生看或者debug都是很难下手,这时候可能就需要使用必杀技(一般不外传那种)——问。

问天问地,谷歌百度必应。我直接问了一个大神——芋艿。大神说这种情况会出问题,具体原因他也记不清了,导致这种现象的问题应该是消费关系不停地相互覆盖。

好了,听到这句话我们就有入口了,至少知道应该从Broker上找起。

顺藤摸瓜找到了原因,下面一起看一下源码。

首先我们知道,消费者的两种实现(推和拉)中都维护一个MQClientInstance,这个类非常重要,在启动消费者的时候,都会去启动这个类,我们看下启动的代码,其中有这么一部分:

// Start various schedule tasks
this.startScheduledTask();
复制代码
这里启动了好多定时任务,我们追进去看一下:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
    try {
        MQClientInstance.this.cleanOfflineBroker();
        //定时发送心跳
        MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
    } catch (Exception e) {
        log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
    }
}

}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
复制代码
这里我们看到,消费者会定时发送心跳给Broker,我们继续追进去,最后找到sendHeartbeatToAllBroker方法:

//给所有的broker发送心跳
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}

                try {
                    //真正发送心跳的部分
                    int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                    if (!this.brokerVersionTable.containsKey(brokerName)) {
                        this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                    }
                    this.brokerVersionTable.get(brokerName).put(addr, version);
                    if (times % 20 == 0) {
                        log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                        log.info(heartbeatData.toString());
                    }
                } catch (Exception e) {
                    if (this.isBrokerInNameServer(addr)) {
                        log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
                    } else {
                        log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                            id, addr);
                    }
                }
            }
        }
    }
}

}
复制代码
这里会向所有的Broker发送心跳,我们根据我们的例子,这时候Broker是一台,我们再去Broker上看一下Broker如何处理心跳消息,我们根据发送的是HEART_BEAT类型的消息,可以在Broker上看到,这类消息使用ClientManageProcessor处理,我们看下处理心跳的部分(heartBeat方法):

//循环所有发送过来的数据
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
//根据消费组的名字获取broker上记录的消费消息
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
//注册消费者
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);

if (changed) {
    log.info("registerConsumer info changed {} {}",
        data.toString(),
        RemotingHelper.parseChannelRemoteAddr(ctx.channel())
    );
}

}
复制代码
我们可以看到,broker会根据consumer放过来的消息,获取自己这边记录的消费者订阅的信息,注意,获取时是按照消费组获取的,我们看下registerConsumer:

//根据消费组获取消费者信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
//注意这里,这里consumerTable的键就是group
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
复制代码
我们注意ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);这里,这句话告诉我们consumerTable中存放的消费者信息是按照消费组来的,那么一个组的消费信息如果不一样,按照我们的例子中,则订阅了TOPICA的消费者心跳信息告诉Broker:我们组订阅的是TOPICA!然后Broker就记录下来了。过了一会订阅了TOPICB的消费者心跳信息高速Broker:我们订阅的是TOPICB!

这里就导致了订阅消息相互覆盖,那么拉取消息时,肯定有一个消费者没法拉到消息,因为Broker上查询不到订阅信息。

至此我们就知道了导致上述现象的原因。

https://blog.csdn.net/weixin_33922670/article/details/87988121?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-15&spm=1001.2101.3001.4242

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容