1.基本概念
broker
kafka由一台或多台机器组成,每一台机器都是一个brokertopic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.-
Segment
partition物理上由多个segment组成
offset
每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.producer
负责发布消息到Kafka brokerconsumer
消息消费者,向Kafka broker读取消息的客户端。Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
2. kafka拓扑结构
3.小结
- 每个topic对应一个或多个partition,每个partion都是一个单独的文件夹
- 消费者消费完消息之后并不会真的从物理上删除这条数据,这条数据依旧会被保留,删除的时间根据配置文件决定
- 使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息
4.常用命令
# kafka集群启动
nohup /home/kafka/bin/kafka-server-start.sh -daemon /home/kafka/config/server.properties 1>/export/logs/kafka/stdout.log 2>/export/logs/kafka/stderr.log &
# 关闭kafka集群
sudo ./kafka-server-stop.sh
# 创建topic
bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 3 --partitions 3 --topic test
# 查看topic信息
./kafka-topics.sh --zookeeper emr-header-1:2181 --topic test --describe
# 获取topic的最大位移
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list emr-header-1:9092 --topic test2 --time -1
# 生产者
./kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
# 消费者
./kafka-console-consumer.sh --zookeeper emr-header-1:2181 --topic test --from-beginning
# 删除topic
# 法一
./bin/kafka-topics.sh --delete --zookeeper emr-header-1:2181 --topic test
# 法二
# 登录zookeeper客户端
/usr/lib/zookeeper-current/bin/zkCli.sh
# 找到topic所在的目录
ls /brokers/topics/
ls /admin/delete_topics/
# 删除topic
rmr /brokers/topics/名称
rmr /admin/delete_topics/名称
删除log存储位置对应的partition
5.python操作Kafka
生产者
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer
def test_producer():
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'), acks='all', api_version=(0, 10))
for i in range(10):
producer.send('test2', str(i), partition=i % 3)
producer.flush()
producer.close()
print 'ok'
if __name__ == '__main__':
test_producer()
消费者
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
def test_consumer():
consumer = KafkaConsumer('test2', bootstrap_servers=['localhost:9092'], group_id='test_group', api_version=(0, 10))
while True:
for message in consumer:
print message.value
import time
time.sleep(1)
if __name__ == '__main__':
test_consumer()