踩了个坑----一个kafka官方版本的bug,但是排查过程还是学了点儿东西。
背景
在项目中使用python3的kafka依赖,由于一些历史原因,测试环境是使用单机部署的。在测试环境会偶现一些一些问题,罗列一些比较重要的错误信息;
ERROR:kafka.conn: <BrokerConnection node_id=0 host=xxx port=xxx> error receiving network data closing socket
ERROR:kafka.consumer.Fetch:Fetch to node 0 failed: connectionError: [Errno 104] connection reset by peer
kafka.conn:connect attempt to <BrokerConnection node_id=0 host=xxx port=xxx> returned error 111. disconnecting
kafka.client:Node 0 connection failed -- refreshing metadata
经过报错提示,说明服务的消费者和远程服务连接中断了,接下来是无休止的尝试重连。但是这个单机kafka是好些项目共用的,只有python的服务下出现了问题。
思路
1.设置session_timeout_ms
首先我们先了解一下session_timeout_ms的用法:这是一个监听consumer是否可用的数值。其实在我们的consumer 的while Ture时,有一个线程是不断的向coordinator“汇报”自己的健康状况,如果coordinator超过这个数值没有得到响应,coordinator会将这个consumer给踢除掉,coordinator需要进行一次rebalance。
Rebalance则是一个重新规定负载均衡的过程,这个过程可能会存在消耗资源大量资源的问题:当coordinator管理的consumer 个数发生了变化,原来每个consumer“认领”的partition都要被重新分配,如果Topic数据较多的情况,可能会较为消耗资源。
为了防止这个consumer被踢出group,我将session_timeout_ms设置的更加大些,避免将此consumer踢出group。
2.排查python 问题
在网上找了许多资料,都不大符合我遇到的问题,但有一个issue与之较为相似。
里面说到了版本问题,于是我看了一下我的python3的kafka依赖版本,是1.3.5的,正好里面提到了有bug。
于是尝试更换更好版本的1.4.0 ,问题解决!