原理
- kafka根据Topic进行归类,kafka由多个broker组成,kafka依赖zookeeper保存集群中的一些meta信息
-
一个Topic为一类消息,每个topic分成多个partition,底层存的是append log文件,新消息会追加到log文件尾部,每条文件在文件啊中的位置称为offset,long类型,唯一标记一条消息
架构图.png - kafka与JMS不同的是,即使被消费,消息也不会立即被删除,日志文件会根据broker配置,保留一段时间
- consumer需要对offset进行保存和修改,正常消费消息的时候,offset会线性向前移动
- 一个topic消息被分布在集群中多个server上,每个server负责读写,还可以配置replicas备份个数
- 有repicated方案,就需要对多个备份进行调度,每个partition都有一个leader,负责读写,失效会由follower接管,kafka会将leader均衡分布
- consumers如果有相同的group,这些消息会在consumer之间负载均衡
- consumers如果有不同的group,则是发布-订阅模式,消息会广播给所有消费者
- 有几个partition最好对应几个consumer
使用场景
- Messaging 常规的消息系统
- Websit activity tracking 网站活性跟踪
- Log Aggregation 日志收集中心
角色
- 持久性:kafka对日志进行append操作,对磁盘检索开支较小,broker还有buffer缓存功能
-
生产者:producer会和所有leader保持socket连接,leader的位置保存在zookeeper中
角色.png - 每个partition中保存的log文件都由offset唯一标记,offset由8个字节数组组成,每个partition有多个log file(segment),最小offset为起始消息的offset,每个segment列表信息会保存在zookeeper
- segment到达一定阀值就会创建一个新的文件,buffer中的消息条数到达时就会flush到log文件中
-
获取消息时需要知道offset和最大chunk size ,chunk size表示的是最大获取消息的总长度
log读写.png
消息传送机制
- at most once 最多一次,无论成败不会再次发送,
消费者fetch消息,保存offset,然后处理消息,如果出现异常则消息不能被fetch到 - at least once 消息至多发送一次,失败会重发直到成功位置
消息处理成功之后,没有保存到zookeeper,则下次fetch处理的可能获取的是上次处理过的消息 - exactly once 消息只发送一次
常用命令
# 创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 查看topic状态
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
配置详解
- earliest:有提交的offset时,从提交的offset进行消费;无提交的offset时从头开始消费
- lastest:有提交的offset时,从提交的offset进行消费;无提交的offset时,消费新产生分区下的数据
消费结论
- 新建一个同组名的消费者时,auto.offset.reset值含义:
earliest 每个分区是从头开始消费的。
none 没有为消费者组找到先前的offset值时,抛出异常 - 当创建一个新分组的消费者时,auto.offset.reset值为latest时,表示消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。
- earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。 - 组与组间的消费者是没有关系的。
topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。
kafka监控(因为kafka新版本改动比较大,一些关键信息保存位置不同)
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
> --zk zkServer:2181 \
> --port 8077 \
> --refresh 10.seconds \
> --retain 2.days