1.高版本的kafka,提供了直接删除n条消息的操作方法。
脚本内容地址:
https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh
使用这个脚本, 配套的还有一个json文件。 新建一个json文件,内容如下,里面指定了partition和offset. 然后把这个文件保存为 offset.json
{"partitions": [{"topic": "mytest", "partition": 0, "offset": 90}], "version":1 }
这时候调用脚本,可以做到删除
./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offset.json
2.如果上述方法,提示错误:
Error: Could not find or load main class kafka.admin.DeleteRecordsCommand
则说明kafka版本过低,这时候可以使用另一种方法。
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic testTopic --config retention.ms=时间(微秒)
动态地更新消息保留时间,假如只保留一小时之内的消息 ,60 x 60 x 1000 = 360000 就设置为retention.ms=3600000
然后kafka需要轮询,之后会执行删除
注意:kafka执行完删除后 你需要再次调用这个脚本 把时间还原回去
这种方法,误差根据你服务器的配置来决定的。由kafka配置文件的log.retention.check.interval.ms参数控制。 并且因为有很多个节点,经常是某个节点删除了数据之后,其他的节点还没有轮询到删除操作。 总的来说精确度不是很高。