kafka-python文档
一、consumer
1. 常用api
#建立连接
consumer = KafkaConsumer(bootstrap_servers=['ip1:port','ip2:port'],
api_version=(0,10),group_id='my_group')
# topic所有的partition
consumer.partitions_for_topic(topic)
# 构造topicPartition对象
tps = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
# 为consumer分配分区
consumer.assign(tps)
# kafka每个分区的最新offset
consumer.end_offsets(tps)
# 当前groupid 每个分区消费到的位置
for i in range(len(tps)):
consumer.position(tps[i])
# 消费数据
for message in consumer:
partition = message.partition
offset = message.offset
value = message.value
# 重置offset
for i in range(len(tps)):
consumer.seek(tps[i], partition_offset[i]) #partition_offset保存每
# partition_offset保存每个分区的起始消费位置
# 形如{0:123, 1:345 },表示0分区从123开始再次消费
二、producer
三、其他
3.1 json处理
额外的包:
pip install msgpack
import msgpack
producer:
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
consumer:
KafkaConsumer(value_deserializer=msgpack.unpackb)
此时得到的value是dict类型