一、kakfa整体架构
二、术语科普
producer & consumer
producer 是生产者,负责消息生产,上游程序中按照标准的消息格式组装(按照每个消息事件的字段定义)发送到指定的topic。producer生产消息的时候,不会因为consumer处理能力不够,而阻塞producer的生产。consumer会从指定的topic 拉取消息,然后处理消费,并提交offset(消息处理偏移量,消费掉的消息并不会主动删除,而是kafka系统根据保存周期自动消除)。
Topic
topic是消费分类存储的队列,可以按照消息类型来分topic存储。
replication
replication是topic复制副本个数,用于解决数据丢失,防止leader topic宕机后,其他副本可以快代替。
broker
broker是缓存代理,Kafka集群中的一台或多台服务器统称broker,用来保存producer发送的消息。Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。
partition
partition是topic的物理分组,在创建topic的时候,可以指定partition 数量。每个partition是逻辑有序的,保证每个消息都是顺序插入的,而且每个消息的offset在不同partition的是唯一不同的
offset
偏移量。kafka为每条在分区的消息保存一个偏移量offset,这也是消费者在分区的位置。比如一个偏移量是5的消费者,表示已经消费了从0-4偏移量的消息,下一个要消费的消息的偏移量是5。每次消息处理完后,要么主动提交offset,要么自动提交,把offset偏移到下一位,如处理offset=6消息。在kafka配置中,如果enable_auto_commit=True和auto_commit_interval_ms=xx,那表示每xx 毫秒自动提交偏移量
group
分组。是指在消费同一topic的不同consumer。每个consumer都有唯一的groupId,同一groupId 属于同一个group。不同groupId的consumer相互不影响。对于一个topic,同一个group的consumer数量不能超过 partition数量。比如,Topic A 有 16个partition,某一个group下有2个consumer,那2个consumer分别消费8个partition,而这个group的consumer数量最多不能超过16个。
三、kafka的配置
kafka的配置主要分四类,分别是zookeeper、server、consumer、producer。其他的配置可以忽略。
zookeeper配置
zk的配置比较简单,也可以默认不改.dataDir是zk存储节点配置的目录地址,clientPort是zk启动的端口,默认2181,maxClientCnxns是限制ip的连接此处,设置0表示无连接次数,一般情况根据业务部署情况,配置合理的值。
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
server配置
############################# Server Basics #############################
# 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
broker.id=0
############################# Socket Server Settings #############################
#默认kafka连接ip和端口
#listeners=PLAINTEXT://:9092
#broker处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3
# broker处理磁盘IO的线程数,数值应该大于你的硬盘数,默认不修改
num.io.threads=8
# socket的发送缓冲区,根据可以服务性能可调
socket.send.buffer.bytes=102400
# socket的接受缓冲区,根据可以服务性能可调
socket.receive.buffer.bytes=102400
# socket请求的最大数值,防止server 内存溢出,message.max.bytes必然要小于socket.request.max.bytes,否则会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 日志目录
log.dirs=/tmp/kafka-logs
# 不指定partition数量情况下的默认数量
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
# 默认保留 7天的消息记录
log.retention.hours=168
#消息总存储达到的最大数量,如果超过这个字节数,无论是否达到7天,都会删除
log.retention.bytes=1073741824
# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824
# 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#zookeeper配置
zookeeper.connect=localhost:2181
# zookeeper的连接超时
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
producer配置
############################# Producer Basics #############################
#broker 列表;格式: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# 数据存储的压缩格式,默认是none,其他: none, gzip, snappy, lz4, zstd
compression.type=none
# 指定分区处理类。默认kafka.producer.DefaultPartitioner
#partitioner.class=
# 写请求的超时
#request.timeout.ms=
consumer配置
#(必需)zookeeper连接服务器地址
zookeeper.connect=zk01:2181,ka02:2181,zk03:2181
#zookeeper的session的过期时间
zookeeper.session.timeout.ms=5000
# zookeeper连接超时
zookeeper.connectiontimeout.ms=10000
#指定多久消费者更新offset到zookeeper中
zookeeper.sync.time.ms=2000
#(必需)consumer group id
group.id=test-consumer-group
#自动向zookeeper提交offset信息,和下面的auto.commit.interval.ms结合起来使用,合理的自动提交间隔,可以防止rebalance
auto.commit.enable=true
#自动更新时间
auto.commit.interval.ms=1000
#当前consumer的标识
consumer.id=xxx
#消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxx
#最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50
#当有新的consumer加入到group时,将会reblance.
rebalance.max.retries=5
#获取消息的最大尺寸,broker不会向consumer输出大于此值得chunk
fetch.min.bytes=655360
#当消息尺寸不足时,server阻塞的时间,如果超时,立即发送给consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
#重置reset的极致,smallest:自动设置reset到最小的offset.
auto.offset.reset=smallest