session.timeout.ms heartbeat.interval.ms参数的区别

注:本文是从https://www.cnblogs.com/hapjin/p/10926882.html处摘抄,可直接跳转至原页面。

从kafka官网截取

最近碰到一个问题,多个业务往向一个Kafka topic发送消息,有些业务的消费量很大,有些业务的消息量很小。因Kafka尚未较好地支持按优先级来消费消息,导致某些业务的消息消费延时的问题。一种简单的解决方案是再增加几个Topic,面对一些系统遗留问题,增加Topic带来的是生产者和消费者处理逻辑复杂性。一种方法是使用Kafka Standalone consumer,先使用consumer.partitionFor("TOPIC_NAME")获取topic下的所有分区信息,再使用 consumer.assign(partitions)显示地为consumer指定消费分区。另一种方法是基于consumer group 自定义Kafka consumer的分区分配策略,那这时候就得对Kafka目前已有的分区分配策略有所了解,并且明白什么时候、什么场景下触发rebalance?

Kafka consumer要消费消息,哪些的分区的消息交给哪个consumer消费呢?这是consumer的分区分配策略,默认有三个:rangeround-robinsticky

因为一个topic往往有多个分区,而我们又会在一个consumer group里面创建多个消费者消费这个topic,因此:就有了一个问题:哪些的分区的消息交给哪个consumer消费呢?这里涉及到三个概念:consumer groupconsumer,以及group coordinator。conusmer分区分配是通过组管理协议来实施的:具体如下:

在正常情况下 ,当有consumer进出consumer group时就会触发rebalance,所谓rebalance就是重新制订一个分区分配方案。而制订好了分区分配方案,就得及时告知各个consumer,这就与 heartbeat.interval.ms 参数有关了。
具体来说就是:

  • 每个consumer 都会根据 heartbeat.interval.ms 参数指定的时间周期性地向group coordinator发送 hearbeat
  • group coordinator 会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含 REBALANCE_IN_PROGRESS 标识,这样各个consumer就知道已经发生了rebalance,同时 group coordinator也知道了各个consumer的存活情况。

那为什么要把 heartbeat.interval.mssession.timeout.ms 进行对比呢?
session.timeout.ms是指:group coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来。
举个示例:

session.timeout.ms=10
heartbeat.interval.ms=3

session.timeout.ms是个"逻辑"指标,它指定了一个阈值---10秒,在这个阈值内如果coordinator未收到consumer的任何消息,那coordinator就认为consumer挂了。而heartbeat.interval.ms是个"物理"指标,它告诉consumer要每3秒给coordinator发一个心跳包,heartbeat.interval.ms越小,发的心跳包越多,它是会影响发TCP包的数量的,产生了实际的影响,这也是我为什么将之称为"物理"指标的原因。

如果group coordinator在一个heartbeat.interval.ms周期内未收到consumer的心跳,就把该consumer移出group,这有点说不过去。就好像consumer犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能consumer出现了一次长时间GC,影响了心跳包的到达,说不定下一个heartbeat就正常了。

heartbeat.interval.ms肯定是要小于session.timeout.ms的,如果consumer group发生了rebalance,通过心跳包里面的REBALANCE_IN_PROGRESSconsumer就能及时知道发生了rebalance,从而更新consumer可消费的分区。而如果超过了session.timeout.msgroup coordinator都认为consumer挂了,那也当然不用把 rebalance信息告诉该consumer了。

kafka0.10.1之后的版本中,将session.timeout.msmax.poll.interval.ms 解耦了。也就是说:new KafkaConsumer对象后,在while true循环中执行consumer.poll拉取消息这个过程中,其实背后是有2个线程的,即一个kafka consumer实例包含2个线程:一个是heartbeat 线程,另一个是processing线程,processing线程可理解为调用consumer.poll方法执行消息处理逻辑的线程,而heartbeat线程是一个后台线程,对程序员是"隐藏不见"的。如果消息处理逻辑很复杂,比如说需要处理5min,那么 max.poll.interval.ms可设置成比5min大一点的值。而heartbeat 线程则和上面提到的参数 heartbeat.interval.ms有关,heartbeat线程 每隔heartbeat.interval.mscoordinator发送一个心跳包,证明自己还活着。只要 heartbeat线程 在 session.timeout.ms 时间内 向 coordinator发送过心跳包,那么 group coordinator就认为当前的kafka consumer是活着的。

kafka0.10.1之前,发送心跳包和消息处理逻辑这2个过程是耦合在一起的,试想:如果一条消息处理时长要5min,而session.timeout.ms=3000ms,那么等 kafka consumer处理完消息,group coordinator早就将consumer 移出group了,因为只有一个线程,在消息处理过程中就无法向group coordinator发送心跳包,超过3000ms未发送心跳包,group coordinator就将该consumer移出group了。而将二者分开,一个processing线程负责执行消息处理逻辑,一个heartbeat线程负责发送心跳包,那么:就算一条消息需要处理5min,只要heartbeat线程在session.timeout.ms时间内向group coordinator发送了心跳包,那consumer可以继续处理消息,而不用担心被移出group了。另一个好处是:如果consumer出了问题,那么在 session.timeout.ms内就能检测出来,而不用等到 max.poll.interval.ms 时长后才能检测出来。

一次kafka consumer 不断地 rebalance 分析

明白了session.timeout.msmax.poll.interval.msheartbeat.interval.ms三个参数的意义后,现在来实际分析一下项目中经常碰到的 consumer rebalance 错误。
一般我们是在一个线程(用户线程)里面执行kafka consumerwhile true循环逻辑的,其实这里有2个线程:一个是用户线程,另一个是心跳线程。心跳线程,我想就是根据heartbeat.interval.ms参数配置的值周期性向coordinator发送心跳包以证明consumer还活着。
如果消息处理逻辑过重,也即用户线程需要执行很长的时间处理消息,然后再提交offset。咋一看,有一个后台心跳线程在不断地发送心跳啊,那为什么group coordinator怎么还老是将consumer移出group,然后导致不断地rebalance呢?
我想,问题应该是 max.poll.interval.ms这个参数引起的吧,因为在ERROR日志中,老是提示:消息处理逻辑花了太长的时间,要么减少max.poll.records值,要么增大session.timeout.ms的值。尽管有后台heartbeat 线程,但是如果consumer的消息处理逻辑时长超过了max.poll.interval.ms ,那么此consumer提交offset就会失败:

org.apache.kafka.clients.consumer.CommitFailedException: 
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
which typically implies that the poll loop is spending too much time message processing. 
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() 
with max.poll.records.

此外,在用户线程中,一般会做一些失败的重试处理。比如通过线程池的 ThreadPoolExecutor#afterExecute()方法捕获到异常,再次提交Runnable任务重新订阅kafka topic。本来消费处理需要很长的时间,如果某个consumer处理超时:消息处理逻辑的时长大于max.poll.interval.ms (或者消息处理过程中发生了异常),被coordinator移出了consumer组,这时由于失败的重试处理,自动从线程池中拿出一个新线程作为消费者去订阅topic,那么意味着有新消费者加入group,就会引发 rebalance,而可悲的是:新的消费者还是来不及处理完所有消息,又被移出group。如此循环,就发生了不停地 rebalance 的现象。

参考资料

kafka kip-62

原文:https://www.cnblogs.com/hapjin/p/10926882.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容