今天的计划看下两个消息中间件RocketMQ和Kafka的Rebalance
方式- -
首先说下Rebalance
是做啥...为啥需要rebalance并介绍一些参与rebalance的基本概念~
Kafka(RocketMQ)在Broker中会将一个topic划分为多个Partition
(ConsumeQueue
), 消息在生产后会被投递到某个Partition
(ConsumeQueue
)中.(PS: partition可以被分配不同broker)
而对于消费者,为了解决一条消息如何消费的问题, 引入了ConsumeGroup
并将Consumer
分配到某个consumeGroup中, MQ在处理消息时会保证消息,在一个Group中只会被一个Consumer
消费(In ClusterMode也是正常大家使用的方式).(所以N台机器如果在Group中只会被一个Consumer收到)
所以,MQ需要
- 将Partition(ConsumeQueue)分配给Consumer
- 并保证一个Partition(ConsumeQueue)只会被分配给一个Group中的一个Consumer(这样就做到一个消息只能被Group中一个Consumer消费了)
- 一个Consumer可以消费多个ConsumeQueue
- 在Consume变化时重新分配保证保证ConsumeQueue都有被处理
- 在Partition数量变化时重新分配保证Consume
- 同样在Broker部分挂机的情况下分配过程保证正确
而上面的过程就是今天要讨论的Rebalance
RocketMQ
粗看流程
RocketMQ的Rebalance逻辑实际是发生在Consume客户端的(当然也必须会从Broker或Nameserver获取一些信息), 处理思路简单说是这样:(注意RocketMQ里的ConsumeQueue可以理解为Kafka的Partition,所以这节里都用ConsumeQueue
表述)
reblance核心逻辑可以参看RebalanceImpl#topicSubscribeInfoTable
- 首先这个rebalance过程是被触发在每个consumer上
- 在客户端获取到当前topic在所有broker所有ConsumeQueue
- 在客户端获取到所有当前ConsumeGroup的Consumer列表(也就是知道和自己在一个Group的其他兄弟姐妹)
- 客户端触发Rebalance时会对所有ConsumeQueue基于QueueID(每个ConsumeQueue的固定的属性)进行排序, 对所有的ConsumeID也进行排序(每个Consumer的标示), 将排序结果.
- 然后使用当前的分配策略进行分配,分配结果就是分配给当前Consumer的ConsumeQueue列表
- 因为每个Consumer都会运行分配所以最终结果是所有consumer都各自拿到属于自己的ConsumeQueue
触发条件这个rebalance的条件有:
- 每20s定时刷新(准确说上次刷新后等20s,
@see RebalanceService#run
- 收到Broker告知的Consume变化通知时
@see ClientRemotingProcessor#notifyConsumerIdsChanged
- 每次Client启动时
@see DefaultMQPushConsumerImpl#start
上面简单的描述了分配过程,不过我们接下来会看下各个细节~
获取所有ConsumeQueue信息
这里我们从设置开始一起走到获取~
- ConsumeQueue会在创建Topic时指定Topic里Queue的数量(细化说有Write和Read这里不展开- -),最终创建结果会被存储到
NameServer
上(就如名字所说保存一些元数据的server) - 所以Consume会直接从
NameServer
获取关于当前有多少Queue的信息 - 获取Queue数量是从本地的``获取的代码位于
@see MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)
这个函数入口有些多但情况就是会从从NameServer获取并在变化时更新在客户端缓存的对特定topic的Routine信息(这包括所有的broker,每个broker编号queueId为0->x) - 有个入口需要关注就是
MQClientInstance#startScheduledTask
会每10s刷主动刷一次 - 然后实际就从
RebalanceImpl#topicSubscribeInfoTable
这个缓存字段获取来rebalance了
问题:...等等一会儿看
获取Group中的其他Consume信息
这里我们倒过来从获取走到数据来源..
- group信息是保存在broker上的每次分配都会从broker拉取
@see MQClientAPIImpl#getConsumerIdListByGroup
- 这里获取Group中其他ConsumerId请求的是任意一个brokerGroup(一个topic创建时可以指定多个
brokerName
, 一个brokerName下可以有多台brokerNode,为了便于理解这里把相同brokerName的多个brokerNode假装叫做brokerGroup) - 之后会从brokerGroup中选择一台机器获取, 获取会优先获取Master节点, 如果Master没有会乱获取一台(实现是hashmap里iter的第一个- -因为后面看到因为每台consumer都连接所有broker所以理论上可以乱选)
- 在broker上每个Group现在的consumer信息是保存在内存中的
ConsumerManager#consumerTable
一个Map - 而更新的地方只有一个
ClientManageProcessor#heartBeat
也就是收到client心跳信息的时候 - 好了, 我们必须看下心跳咋上报的
MQClientInstance#sendHeartbeatToAllBroker
, 可以看到果然,只要当前client有consumer信息(即不是纯粹的produer角色的client)就会像所有brokerNode上报心跳, 注意不是brokerGroup是所有node(这里感觉建立心跳连接有些多???不过目前这模式好像没太好优化方法 再想想- -)
好了这里画个图总结下~
所有Conume都会向所有broker建立连接并心跳上报,所以所以任意一台broker都有当前group的所有节点信息(正常情况), 客户端想要获取当前group的所有consumer信息直接乱选一台活着的获取就好了
几个内置的分配策略
首先前面说过分配前提是已经获取到所有可用Queue和所有当前Group的Consumer,并都做了排序,各个Consumer各自执行分配, 分配逻辑实现是AllocateMessageQueueStrategy
的几个实现
- AllocateMessageQueueAveragely: 平均分配
- AllocateMessageQueueByMachineRoom: 看注释是什么alipay逻辑机房的逻辑???主要是对brokerName基于逻辑机房进行了筛选- -?不过能否用怎么用就...
- AllocateMessageQueueAveragelyByCircle: 环形分配
- AllocateMessageQueueByConfig: 配死(或通过其他机制动起来?)
总结下最后结果就是能获取到属于当前consumer的ConsumeQueue(代码里叫MessageQueue)
标记已Queue已被占用
上面我们看到整个标记过程都是在consumer本地就完成了,各个consumer间通过排序+一个一致的算法就完成了分配,并没有和其他consumer的交互。
然而这是有问题的,因为rebalance是各自执行,不排除某个时刻两个同一个Group的两个Consumer都怼到一个Queue上,而这个从设计上是绝对不允许的,所以这里需要一个机制保证永远不会出现同Group两个Consume怼到一个Queue上。
RocketMQ目前是选择在Broker上维护一个LockMap来实现(后面会讨论这个也许有问题??)
在RebalanceImpl#updateProcessQueueTableInRebalance
中, 如果是新分配的Queue, 会调用this.lock(mq)
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) { // !!! here
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
继续往下跟代码(为了避免太长这里不贴代码了),会发现lock会向masterNode(brokerId=0)的节点发LockBatchRequestBody(只有Master?Master挂了的话- -?)
最后在masterNode内存中会通过RebalanceLockManager#mqLockTable
实现加锁占用(带超时默认1分钟超时类似租期), 如果master时这lock信息会丢失掉?当依赖定时rebalance可以恢复,不过那次rebalance如果有冲突之类的情况发生的话...? 好吧 后面再来看这些check特殊场景
如果加锁失败(别人已经占用或者锁请求失败)会不对这个queue不做处理。。然后等下次rebalance, 再来看别人是否释放锁和masterbroker是否恢复...
同样在RebalanceImpl#updateProcessQueueTableInRebalance
会将无需处理的队列从当前处理中remove掉~这部分逻辑跟下去位于RebalancePushImpl#removeUnnecessaryMessageQueue
,会等待当前正在执行的消费并等processQueue处理干净才尝试向maserNode发起unlock(看代码这里好像没处理masterBroker网路不通的情况如果unlock不成功直接算成功了???)
好了如果按照预期正常unlock,其他consumer可以lock并开始消费,或者等20s下次rebalance可以开始消费(如果本次因次序没竞争lock上)
Challenge
最后,我们来假设些场景,看看能否正常work
1. Consume加入
- 假设开始有1个
a
consumer消费3个队列q1``q2``q3
,启动后rebalance消费3个q并在lockMap中都占有 -
b
consumer这时加入,b
自己启动触发自己rebalance,a
收到b
加入的change事件后开始rebalace, -
b
获得q3
, 所以尝试lock,但a还占有着lock失败暂时不去消费q3
-
a
获取q1
,q2
, 所以removeq3
,并在处理当前消息等一会unlock - 定时rebalance运行,
b
成功lockq3
并开始消费
整体看没啥问题,虽然新加入的consumer要等一阵才能接手消费(有间隙消费小lag), 另外那个等一会儿unlock特殊情况下一会儿小概率会有问题
2. Consume离开(断线)
- 假设两个consumer
a
处理q1
,q2
,b
处理q3
-
b
因为网络原因断线,broker发出change事件触发在线的a
进行rebalance -
a
这时会分配接管q1``q2``q3
, 对新加入的q3
进行lock, 然后发现是lock不了的因为b
已经在lock了 - 这时候需要等60s后的rebalance,
a
才有机会解盘q3
的消息
感觉这部分等锁超时有些无奈- -,消费不会乱但会有消费lag增加
3. Consume同时并发加入
- 假设开始只有一个consumer
a
处理q1``q2``q3
- 之后consumer
b
和c
“同时”加入 - 首先各自启动后自己rebalance,
b
lockq2
,c
lockq3
可能会失败 - 然后
a
收到change消息开始rebalance, 这时可能看到b
和c
也可能只看到b
,rebalance处理change是使用wakeup不会重复唤醒(已醒着不会再来一次),所以本次rebalance是有可能认为只有b
只unlockq3
..不过没关系还有下次20s的rebalance那次还是可以怼正 - 同理
b
,c
靠20s运行一次的rebalance也是可以怼正
所以我们看到可以保证消费不会乱,不过代价是要过一阵新加入的consume才能真正开始接手消费(间隙小lag)
4. Topic调整Queue数量
上面提到过Client每10s会从NameServer刷一次TopicRoutine(MQClientInstance#startScheduledTask
), 所以Queue变化正常会在这里被收到并更新本地缓存。
然后,正常情况下等下次rebalance时就会用新的Queue信息进行重新分配,然后基于上面说的lock和定期重rebalance规则,最终可以保证ok且中途不乱
异常情况下,想到的几个NameServer数据不一致或交换routine刷新和rebalance次序,看好像最终也都能达到期望状态 - -
5. Broker挂了
上面提到过,lock信息是放到每个BrokerGroup中的master(id0)上的,所以如果Master挂了的话,lock会用永远不成功,可以理解为新Consume无法加入,老Consume无法退出,必须等待broker活过来,但之前在跑的的还可以正常运行(只要别离开了还没加入然后broker挂了- -这种情况部分queue会有lag)
(PS背景介绍: 在rocketmq的设计里brokerGroup的master挂了group不可以写入,但可以改写其他brokerGroup来完成写入HA,消费者HA可以通过brokerGroup里的slave消费之前堆在brokerGroup里的内容)
broker活过来后,因为是内存,所以下次触发rebalance会重新恢复lock的map。。
不过感觉有个极端情况。。就是master挂了,然后这时消费者有变化或者队列数目有调整。。。因为启动时内存为空等于没占锁,而实际之前consumer已经在跑,在还没来得及rebalance就发生了变更,这时可能出现同group里两个consumer同时消费一个queue????
RocketMQ小结
RocketMQ在master不挂的情况下rebalance可以保证消费不乱,虽然可能会有消息lag问题但感觉并不关键;而master挂且同时发生rebalance这个的确有些问题。。此外rebalance完全由客户端控制其他人有没有用上相互之前并不知道;并且各自拉namesrv可能会看到不一致的数据虽然最终通过定期重rebalance可以一致会导致不必要的rebalance的感觉- -
看似有些问题待解决,如果理解有误欢迎讨论~~哈哈哈
下面开始看下kafka0.9版本之后的方式,据说kafka在很久之前和rocketmq目前用的很像, 但后来改了..
Kafka
kafka还在看, 没按照预期搞完,见下偏文章哈~