分布式系统中,系统由多个子系统组成,数据需要在子系统中高性能、低延迟的流转。Kafka是"发布-订阅"消息系统,是一个分布式的、分区的日志服务,用于处理活跃的流式数据
特点
- 高吞吐量:同时为发布和订阅提供高吞吐量,读速度为100MB/s,写速度为50MB/s
- 持久化:消息持久化在磁盘上,同时replication防止数据丢失
- 分布式:所有的producer、broker和consumer都支持多个
- 消息处理状态由consumer端使用offset维护,而不是由broker维护
Kafka中Zookeeper的作用
Zookeeper是1个分布式系统中的协调服务,主要用于集群中不同节点间的相互通信,解决节点一致性问题
Kafka中,Zookeeper用于存储Consumer Group信息和偏移量offset,分区再平衡,实时监控节点信息,Broker leader选举,Producer和Consumer用Zookeeper来发现Broker列表
(Producer和Consumer需要知道哪些Broker是可用的,没有zk,每个Producer和Consumer客户端在生产和消费之前都需要检查Broker是否可用,效率太低)
Zookeeper不保存消息,消息保存在Broker上。从Kafka0.9开始,Consumer group和offset信息不保存在zk上,也保存在Broker上
- Broker
(1)Broker启动时,向zk注册。在zk中,Broker是1个临时节点znode,当Broker与zk断开连接时,znode被删除
/brokers/ids/brokerid
(2)Broker启动时,向zk注册持有的topic和partition,是1个临时节点
/brokers/topics/topic_name/partition_index
(3)集群中的Broker在启动时,会进行leader选举。Broker会尝试在zk上创建/controller临时节点,成功创建的Broker会成为集群的leader,其它Broker在leader确定时,会收到"节点已存在"的消息,并在leader创建的/controller临时节点上设置Watch。当leader挂掉后,leader创建的临时节点会被zk删除,其它Broker会收到Watch通知,并尝试创建/controller临时节点,创建成功的Broker成为新的leader
(4)Broker leader负责Partition leader的选举
当1个分区的leader所在的Broker挂掉时,Broker会遍历分区的所有副本,在副本中选择1个新的分区leader,简单说,就是在isr列表中选择第1个副本作为新的分区leader。选出新的分区leader后,Broker leader向所有Broker通知这1变动,新的分区leader开始处理读写请求,分区follower开始同步新的分区leader的消息
- Producer
使用zk发现Broker列表,和Topic下的Partition的leader建立连接
- Consumer
发现Broker列表,和Topic下的Partition的leader建立连接,注册Consumer信息
Producer和Consumer需要知道哪些Broker是可用的,没有zk,每个Producer和Consumer客户端在生产和消费之前都需要检查Broker是否可用,效率太低
相关概念
- 生产者Producer,向Kafka集群发送消息
- 消费者Consumer,与Kafka集群中的Broker实例建立长连接,不断拉取消息,然后进行处理
- 主题Topic,通过对消息指定主题,来对消息分类
(1)1个主题Topic被认为是1类消息,每个Topic被分为多个分区Partition,Partition在存储层面是1个增量的文件,发布到partition的消息会被追加到文件的末尾,每条消息在文件中的位置称为偏移量offset,是1long型的数字,唯一标记1条消息
(2)创建分区时,需要指定分区数partition和复制系数(默认都是1)
分区数越大,吞吐量越大,需要的资源也就越多,Kafka集群在接收到Producer生产的消息时,会根据均衡策略(轮询,Key-Hash,Random随机)将消息存储到不同的partition。通过partition存储消息,既可以存储更多的数据(partition分布在多个Broker上,避免文件达到单机磁盘上限),也可以提高读写效率(多个Broker处理读写操作的效率,肯定大于1个Broker)
复制系数的目的是冗余备份,分为leader和follower,leader负责处理所有的读写请求,follower会定期同步leader的数据,当leader挂掉,Kafka会选择1个follower成为leader
创建Topic时,Kafka集群会决定如何将partition及其副本分配到Broker上。规则是
(a)不同Partition的leader分配到不同的Broker上,相同Partition的leader和follower分配到不同的Broker上
例如,有5个Broker,Toipc有10个Partition,复制系数为3,则需要在5个Broker上分配30个副本
(a)随机选择1个Broker,假如是Broker4,将Partition1的leader分配给Broker4,之后使用轮询的方式将其他leader分配给Broker,Partition2的leader分配给Broker5,Partition3的leader分配给Broker1
(b)分配完Partition leader,会分配follower,需要保证1个Partition的leader和follower分配在不同的Broker
(3)被消费的消息不是立刻被删除,文件会根据Broker的配置,保留一段时间后才删除。默认为7天
- 偏移量offset,Consumer持有的元数据,offset是Consumer当前消费消息在Kafka文件中的位置
分组消费
Consumer需要做1些高延迟的操作,例如数据写入DB、使用数据进行耗时计算。此时,单个Consumer无法跟上Producer生产数据的速度,使用分组消费模式,增加Consumer的个数,提高处理能力
分区再平衡 Rebalance
分组消费模式下,1个Consumer只能消费1个Topic下的1个分区,Consumer和Partition是有对应关系的,当出现1些情况,例如
(1)Consumer加入Consumer Group
(2)Consumer退出Consumer Group或取消订阅
(3)Topic下的partition个数增加
Consumer Group中的协调者Coordinator使用分区再平衡机制,来调整Consumer和Partition的对应关系Coordinator
Kafka0.9之后,Consumer的offset不再存放在zk上,而是存在Broker上,存放的位置是使用Math.abs()对groupId.hashCode()取绝对值,再对offsets.topic.num.partitions(配置文件中的offset分区总数,默认50)取余
Kafka0.9后,每个Consumer Group都有1个coordinator,用来管理Consumer Group和offset。coordinator就是当前Consumer Group存放offset的这个partition的leader所在的Broker。coordinator 负责与Consumer Group中的所有Consumer进行协调通信
(1)定期发送心跳,监控Consumer存活情况
(2)Consumer离开Consumer Group时,会告诉coordinator
(3)将parititon的分配情况,通知组内所有Consumer
Kafka提供了2套API,High-Level Consumer API和SimpleConsumer API
High-Level Consumer API提供了对Consumer Group各种操作的封装,在消费消息时,不需要关注offset的提交,会自动提交;若使用多线程
(1)如果Consumer线程>partition,会有线程收不到消息
(2)如果Consumer线程<partition,会有线程收到多个partition的消息
(3)如果1个线程消费多个partition,无法保证收到消息的顺序,而1个partition中的消息时有序的
如果Consumer希望从头开始消费Partition的全量数据
(1)使用新的Consumer Group,即"group.id"
(2)指定"auto.offset.reset"为earliest
[KafkaConsumer is not safe for multi-threaded access]
SimpleConsumer API可以多Kafka进行更精确的控制,但是需要自己编写代码,处理offset的提交,leader的变更等
生产者
消息发送流程:
(1)创建KafkaProducer对象,初始化组件,例如缓冲区,发送消息线程
(2)创建ProducerRecord消息对象,设置消息Topic,key和value,将key和value序列化为byte[]
(3)分区器为消息选择合适的分区partition,默认使用轮询,可实现Partitioner接口的partition方法自定义分区
(4)消息会根据partition发送到不同的暂存区暂存
(5)后台消息发送线程Sender,从暂存区中获取消息,发送给Broker
(6)如果消息成功发送到Broker,返回RecordMetadata对象,包含消息目标分区,offset和时间戳;如果发生失败,且设置了重试次数,进行重试,否则返回失败
创建配置对象Properties
(1)Broker地址 bootstrap.servers (localhost:9101)
(2)消息写入成功策略 acks all (0,Producer不等待确认;1,等待分区leader写入成功;all,等待isr所有副本写入成功)
(3)重试次数 retries 0
(4)缓存大小 batch.size 默认16kb,Producer会将消息缓存,当消息达到一定大小时,才一起发送
(5)每次发送消息时,延迟一定时间 linger.ms 默认0
(6)Producer可用内存大小 buffer.memory 默认32M
(7)Key和Value的序列化方式创建KafkaProducer对象
Producer<String,String> producer = new KafkaProducer<>(props);使用KafkaProducer对象的send(),发送1个代表消息的ProducerRecord对象。send()是异步的,返回1个放在Future中的RecordMetadata对象,如果使用Future.get()获取RecordMetadata对象,会阻塞直到有结果返回;可以在send()上设置回调方法Callback实现无阻塞
消费者
创建配置对象Properties
(1)Broker地址 bootstrap.servers
(2)组id group.id
(3)开启自动提交 enable.auto.commit true
(4)offset重置方式 auto.offset.reset 默认latest
earliest 之前提交过offset,从offset开始消费;未提交过,从头开始消费
latest 之前提交过offset,从offset开始消费;未提交过,等待分区产生新的消息,从新消息开始消费
none 之前提交过offset,从offset开始消费;Consumer Group涉及的分区,有1个未提交过offset,报错
(5)自动提交频率 auto.commit.interval.ms 默认5s
(6)Key和Value的序列化方式创建KafkaConsumer对象,订阅主题
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("sonia"));Kafka High-Level Consumer API在KafkaConsumer对象的poll()中封装了
(1)Consumer Group管理,调用poll()时,获得coordinator和分区partition分布情况
(2)获得分区重平衡后,Consumer和Partition的对应信息
(3)向Broker发送心跳,否则超时Broker会认为Consumer死亡,Consumer消费的partition会被分配给其他的Consumer
(4)获得消息
使用时,将poll()放在while(true)循环中,不断执行poll(),不断的从Broker pull消息;同时向Broker发送心跳,维持与Broker之间的连接,实时感知分区再平衡后的Consumer与Partition的关系,并作出调整
[提交offset的问题]