sarama 使用遇到的问题

sarama是kafka go语言的的一个框架,star数也挺高的,这里记录下使用遇到的一些问题。

跟着官方给的例子学习,文件consumer_group_test.go给出了如何使用消费组的例子。文件里有以下一段代码:

// Iterate over consumer sessions.
    ctx := context.Background()
    for {
        topics := []string{"my-topic"}
        handler := exampleConsumerGroupHandler{}
        

        err := group.Consume(ctx, topics, handler)
        if err != nil {
            panic(err)
        }
    }

问题: 消费组不生效

for循环引发的问题
经过测试代码的for循环测试发现去掉也可以正常消费,于是觉得没有加上for循环。等去掉for循环后之后,诡异的问题出现了。消费组不生效了,比如一个消费组内部起了3个客户端,发现只有一个客户端生效会消费消息,很诡异。后面经过问题排查发现这个for循环不能够去掉,把for循环加上消费组才又生效了。

经过测试发现,rebalance,断网重连都会重新触发for循环代码又运行一次,也意味着 group.Consume(ctx, topics, handler)这个方法在没有遇到上面两种情况的时候是阻塞的,消息消费正常,所以去掉for循环也没关系。一旦遇到上面两种情况,方法就会返回,此时需要重新进入才可以正常消费消息。

参数sarama.OffsetNewest引发的问题

config.Consumer.Offsets.Initial = sarama.OffsetNewest
``
上面这个参数是控制消费组初始消费的位置,默认是OffsetNewest,即从最新的位置开始消费。如果把参数改为如下:

config.Consumer.Offsets.Initial = sarama.OffsetOldest
``
即从最早的位置开始消费。会发现也会遇到消费组内只有一个消费客户端在消费消息,其他消费组内的客户端不消费消息。诶,这个问题很诡异。。由于我们的业务只需要从最新位置开始消费消息即可,所以就不去深究原因了。。

参数sarama.BalanceStrategyRange引发的问题

config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange

上面这个参数是控制rebalance时的分区策略,默认是按照范围来分区,即BalanceStrategyRange。

此时如果把配置更改为如下:

config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin

上面是采用轮询的方式来分区。如果改成改参数,会报一下异常:

 The provider group protocol type is incompatible with the other members.

这是因为无法正确分区导致的问题,此时把分区策略改为 BalanceStrategyRange即可,就不会再有上面这个错误出现。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容