查询topic的offset的范围
用下面命令可以查询到topic:test broker:suna:9092的offset的最小值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2
输出
test:0:1288
查询offset的最大值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -1
输出
test:0:7885
从上面的输出可以看出topic:test只有一个partition:0 offset范围为:[1288,7885]
设置consumer group的offset
启动zookeeper client
/zookeeper/bin/zkCli.sh
通过下面命令设置consumer group:testgroup topic:test partition:0的offset为1288:
set /consumers/testgroup/offsets/test/0 1288
注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:
set /kafka/consumers/testgroup/offsets/test/0 1288
重启相关的应用程序,就可以从设置的offset开始读数据了。
手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改偏移量的类:kafka.tools.UpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体操作如下:
[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic
在不输入参数的情况下,我们可以得知kafka.tools.UpdateOffsetsInZK类需要输入的参数。我们的consumer.properties文件配置内容如下:
zookeeper.connect=www.iteblog.com:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=group
这个工具只能把Zookeeper中偏移量设置成earliest或者latest,如下:
[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
earliest config/consumer.properties iteblog
updating partition 0 with new offset: 276022922
updating partition 1 with new offset: 234360148
updating partition 2 with new offset: 157237157
updating partition 3 with new offset: 106968019
updating partition 4 with new offset: 80696130
updating partition 5 with new offset: 317144986
updating partition 6 with new offset: 299182459
updating partition 7 with new offset: 197012246
updating partition 8 with new offset: 230433681
updating partition 9 with new offset: 120971431
updating partition 10 with new offset: 51200673
updated the offset for 11 partitions