kafka小记

1.基本概念

  • broker
    kafka由一台或多台机器组成,每一台机器都是一个broker

  • topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • partition
    Parition是物理上的概念,每个Topic包含一个或多个Partition.

  • Segment
    partition物理上由多个segment组成

  • offset
    每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

  • producer
    负责发布消息到Kafka broker

  • consumer
    消息消费者,向Kafka broker读取消息的客户端。

  • Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

2. kafka拓扑结构

3.小结

  1. 每个topic对应一个或多个partition,每个partion都是一个单独的文件夹
  2. 消费者消费完消息之后并不会真的从物理上删除这条数据,这条数据依旧会被保留,删除的时间根据配置文件决定
  3. 使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息

4.常用命令

# kafka集群启动
nohup /home/kafka/bin/kafka-server-start.sh -daemon /home/kafka/config/server.properties 1>/export/logs/kafka/stdout.log 2>/export/logs/kafka/stderr.log &

# 关闭kafka集群
sudo ./kafka-server-stop.sh

# 创建topic
bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 3 --partitions 3 --topic test

# 查看topic信息
./kafka-topics.sh --zookeeper emr-header-1:2181 --topic test --describe

# 获取topic的最大位移
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list emr-header-1:9092 --topic test2 --time -1

# 生产者
./kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test

# 消费者
./kafka-console-consumer.sh --zookeeper emr-header-1:2181 --topic test --from-beginning


# 删除topic
# 法一
./bin/kafka-topics.sh  --delete --zookeeper emr-header-1:2181  --topic test

# 法二
# 登录zookeeper客户端
/usr/lib/zookeeper-current/bin/zkCli.sh
# 找到topic所在的目录
ls /brokers/topics/
ls /admin/delete_topics/
# 删除topic
rmr /brokers/topics/名称
rmr /admin/delete_topics/名称
删除log存储位置对应的partition

5.python操作Kafka

生产者

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
from kafka import KafkaProducer

def test_producer():
    producer = KafkaProducer(bootstrap_servers='localhost:9092',
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'), acks='all', api_version=(0, 10))
    for i in range(10):
        producer.send('test2', str(i), partition=i % 3)
        producer.flush()

    producer.close()
    print 'ok'


if __name__ == '__main__':
    test_producer()

消费者

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer

def test_consumer():
    consumer = KafkaConsumer('test2', bootstrap_servers=['localhost:9092'], group_id='test_group', api_version=(0, 10))
    while True:
        for message in consumer:
            print message.value

        import time
        time.sleep(1)


if __name__ == '__main__':
    test_consumer()

6.优秀的文章

【美团】Kafka文件存储机制那些事

【infoq】Kafka背景及架构介绍

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容