在kafka中有几个重要的组成:broker,producer,consumer(consumer group),zookeeper,topic。我们现在针对每个组件去单独介绍。
一、模块组成
kafka的架构如下图所示:
上图中包含了一个kafka集群的所有组件:
1)三台broker集群
2)三台zookeeper集群
3)一个生产者producer
4)一组消费者consumer group
5)一个单独的消费者consumer3
6)一个topic1,其中partition是2,表示每个topic有两个分区,replica是3,表示每个分区有三个副本,分布在每台broker上。
蓝色消息发送的流程:
1)producer发送蓝色消息到topic1;
2)假设发送到broker1的topic1中的partition1中,此时这个partition1自动成为了当前partition三个副本replicas的leader,则broker2和broker3的两个partition1自然的成为follower;
3)由当前的replica leader负责当前分区消息的读和写,另外两个分区follower会从leader同步消息。
4)consumer group的consumer1去消费partition1的leader中的消息,则consumer2是不能消费的;同一组内只有一个consumer可以消费消息。
红色消息发送的流程:
1)producer发送蓝色消息到topic1;
2)此时发送到broker2的topic1中的partition2中,此时这个partition2自动成为了当前partition三个副本replicas的leader,则broker1和broker3的两个partition2自然的成为follower;
3)由当前的replica leader负责当前分区消息的读和写,另外两个分区follower会从leader同步消息。
4)consumer3去消费partition2的leader中的消息。
再简单模拟了消息的分布和生产消费过程后,我们具体说明每个组成的功能:
1)producer:消息生产者,向kafka的broker发送消息的客户端。
2)consumer: :消息消费者,向 kafka broker拉取消息的客户端;
3)consumer group:消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)broker:一台kafka服务器就是一个broker,一个集群在有多个broker。一个broker内可存放逗哥topic。
5 )topic :可以理解为一个队列, 生产者和消费者面向的都是一个 topic。
6 )partition :一个非常大的topic可以分布到多个broker上,一个 topic分为多个partition,每个partition是一个有序的队列。
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,使得kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
8 )leader:每个分区多个副本的主节点,负责数据的读写,即生产者发送数据的对象,以及消费者消费数据的对象都是leader。
9 )follower:每个分区多个副本中的从节点,实时从leader中同步数据,保持和leader数据
同步。leader发生故障时,某个follower会成为新的leader。
二、kafka的存储机制
Kafka作为一个支持大数据量写入写出的消息队列,由于是基于Scala和Java实现的,而Scala和Java均需要在JVM上运行,所以如果是基于内存的方式,即JVM的堆来进行数据存储则需要开辟很大的堆来支持数据读写,从而会导致GC频繁影响性能。考虑到这些因素,kafka是使用磁盘存储数据的。
Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。topic存储结构见下图:
由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个partition分为多个segment。每个 segment对应两个文件“.index”文件和“.log”文件。
partition文件夹命名规则:
topic 名称+分区序号,举例有一个topic名称文“kafka”,这个topic有三个分区,则每个文件夹命名如下:
kafka-0
kafka-1
kafka-2
index和log文件的命名规则:
1)partition文件夹中的第一个segment从0开始,以后每个segement文件以上一个segment文件的最后一条消息的offset+1命名(当前日志中的第一条消息的offset值命名)。
2)数值最大为64位long大小。19位数字字符长度,没有数字用0填充。
举例,有以下三对文件:
0000000000000000000.log
0000000000000000000.index
0000000000000002584.log
0000000000000002584.index
0000000000000006857.log
0000000000000006857.index
以第二个文件为例看下对应的数据结构:
这里面使用的是稀疏索引,需要注意下:
消息查找过程:
找message-2589,即offset为2589:
1)先定位segment文件,在0000000000000002584中。
2)计算查找的offset在日志文件的相对偏移量
offset - 文件名的数量 = 2589 - 2584 = 5;
在index文件查找第一个参数的值,若找到,则获取到偏移量,通过偏移量到log文件去找对应偏移量的数据即可;
本例中没有找到,则找到当前索引中偏移量的上线最接近的值,即3,偏移量文246;然后到log文件中从偏移量为246数据开始向下寻找。
三、生产者Producer
3.1 生产者组成
通过下图看下生产者发送消息的流程:
1)组装ProducerRecord,执行发送方法。
2)经过序列化器Seriallizer,将key和value经过序列化成为二进制数组。发送到分区器。
3)在分区器如果制定了partition,则直接返回对应的partition;否则分配器将基于key值来返回一个分区。
4)确定分区后,将这些消息放到指定topic和partition的批量消息中。由另外的线程负责发送批量消息。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。
5)broker接收到消息后,如果成功会返回一个RecordMetadata,失败且不重试的话,则会返回一个异常。
3.2 分区策略
3.2.1分区解决的问题
1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
2)提高了并发,可以以partition为单位进行读写。
3.2.2分区的使用及其原则
kafka允许我们在发送消息的时候指定分区,我们需要将发送的消息封装成ProducerRecord:
然后调用KafkaTemplate中的send方法:
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
return this.doSend(record);
}
从上面的多个构造方法中,我们看到可以传递不通的参数,其实不通的参数有不同的分区原则:
(1)指明partition的情况下,直接将指明的值直接作为partiton值;
(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
(3)既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition。
值,也就是常说的 round-robin 算法。
通过如下代码简单实践下:
三种接口:partition1和partition2、partition3。
/**
* 传partition
*
* @param topic
* @param partition
* @param key
* @param value
* @return void
* @author weirx
* @date: 2021/2/5
*/
@RequestMapping("/send/partition1")
public void sendPartition1(String topic, Integer partition, String key, String value) {
ProducerRecord producerRecord = new ProducerRecord(topic, partition, new Date().getTime(), key, value);
producer.sendPartition(producerRecord);
}
/**
* 无partition,有key
*
* @param topic
* @param key
* @param value
* @return void
* @author weirx
* @date: 2021/2/5
*/
@RequestMapping("/send/partition2")
public void sendPartition2(String topic, String key, String value) {
ProducerRecord producerRecord = new ProducerRecord(topic, key, value);
producer.sendPartition(producerRecord);
}
/**
* 没有partition,也没有key
*
* @param topic
* @param value
* @return void
* @author weirx
* @date: 2021/2/5
*/
@RequestMapping("/send/partition3")
public void sendPartition3(String topic, String value) {
ProducerRecord producerRecord = new ProducerRecord(topic, value);
producer.sendPartition(producerRecord);
}
按顺序分别测试,参数都如以下方式给:
# 都传
http://localhost:8085/test/kafka/send/partition1?topic=test-kafka&key=weirx&value=hello%20kafka&partition=9
# 无partition
http://localhost:8085/test/kafka/send/partition2?topic=test-kafka&key=weirx&value=hello%20kafka
# 无partition和key
http://localhost:8085/test/kafka/send/partition3?topic=test-kafka&value=hello%20kafka
分别得到结果是:
传partition的情况下消息确实存储在partition9:
2021-02-05 12:18:59.241 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 9, leaderEpoch = 0, offset = 3, CreateTime = 1612498739238, serialized key size = 5, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = weirx, value = hello kafka)
2021-02-05 12:18:59.241 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
不传partition的情况,连发三次,都是在parttition3
2021-02-05 12:20:53.549 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 7, CreateTime = 1612498853547, serialized key size = 5, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = weirx, value = hello kafka)
2021-02-05 12:20:53.549 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:20:59.532 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 8, CreateTime = 1612498859530, serialized key size = 5, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = weirx, value = hello kafka)
2021-02-05 12:20:59.532 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:21:02.261 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 9, CreateTime = 1612498862258, serialized key size = 5, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = weirx, value = hello kafka)
2021-02-05 12:21:02.261 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
都不传的情况,连发11条,发现第一条和第十一条都是partition2,中间没有重复,符合上面的结论。
2021-02-05 12:22:59.484 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 2, leaderEpoch = 2, offset = 2, CreateTime = 1612498979481, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:22:59.485 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:00.988 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 1, leaderEpoch = 2, offset = 2, CreateTime = 1612498980985, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:00.988 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:01.666 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 8, leaderEpoch = 2, offset = 2, CreateTime = 1612498981663, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:01.666 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:02.307 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 7, leaderEpoch = 2, offset = 2, CreateTime = 1612498982304, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:02.307 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:02.882 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 6, leaderEpoch = 0, offset = 2, CreateTime = 1612498982880, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:02.882 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:03.402 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 5, leaderEpoch = 2, offset = 3, CreateTime = 1612498983400, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:03.402 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:04.026 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 9, leaderEpoch = 0, offset = 4, CreateTime = 1612498984024, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:04.026 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:04.627 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1612498984625, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:04.628 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:05.475 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 4, leaderEpoch = 2, offset = 3, CreateTime = 1612498985473, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:05.475 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:06.154 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 10, CreateTime = 1612498986152, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:06.154 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-05 12:23:09.396 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 2, leaderEpoch = 2, offset = 3, CreateTime = 1612498989394, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-05 12:23:09.397 INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
3.3 ack
ack(acknowledgement),是kafka的一种确认机制,保证生产者发送的消息能够可靠的的发送到broker。每个topic的partition收到消息后都需要向producer返回一个ack。如果收到表示消息发送成功,否则会再次发送。
其具体的ack机制如下所示:
1)左侧表示成功发送,send并且接收到ack,继续执行next send,也收到ack。
2)右侧表示send首次失败了,则去resend到其他的partition上,成功接收到ack。然后再去执行next send。
3.3.1 同步机制
通过上面的分析会产生一个问题:何时发送ack?
通常有两种方式:
1)超过半数的follower同步成功
2)所有follower同步成功。
kafka采用如上方案的第二种:全部follower通过,发送ack
采取以上的ack方式,又会引发出下一个问题:当一个follower节点由于某种故障,迟迟不能与leader进行同步,此时leader需要一直等待,直到其同步完成。
由于上面的问题,kafka提供了一种机制:ISR(in-sync Replicas)
ISR(in-sync Replicas):Leader会维护一个ISR(in-sync Replicas),内部是所有和leader进行同步的follower,当ISR的所有follower完成同步后,leader会发送ack给producer。如果follower长时间未向leader同步数据,则会将该follower踢出ISR。如果leader宕机了,则会从ISR重新选举leader。
除此之外还有OSR(Out-Sync Relipcas):不能和leader保持同步的集合。
3.3.2 ack的应答机制
kafka为用户提供三种可靠性级别。在springboot的yml文件中通过acks的配置:
kafka:
bootstrap-servers: 192.168.184.134:9092,192.168.184.135:9092,192.168.184.136:9092
producer:
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的leader节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: all
以上机制酌情使用,都会存在问题:
0:broker故障,数据丢失。
1:leader落盘成功并返回ack,follower同步失败或leader故障。
all:follower同步数据完成,发送ack之前leader故障,则造成了重复数据。
3.3.3 故障处理机制
首先了解两个名词:LEO(Log End Offset)和HW(High Watermark)。
LEO:每个副本的最后一个offset。
HW:所有副本中最小的LEO。
当leader发生故障时:
但leader发生故障,会从ISR中选出一个新的leader;为了保证副本的一致性,其余的follower会截掉各自log文件中高于HW的部分,然后重新从leader同步数据。
注意:只保证了副本将数据的一致,不能保证数据不丢失或者不重复。
当follower发生故障时:
当follower发生故障时,会被踢出ISR,待其恢复时,follower会读取磁盘记录的上次的HW,并将log文件中高于HW的部分截掉,,从HW开始同步,追赶leader,当该follower的LEO大于等于HW时,即已经追上了leader,就可以重新加入ISR了。
3.4Exactly Once 语义
上面说过生产者可以设定可靠性的级别:
当设置为0时,可以保证每条消息只会发送一次,即At Most Once语义。保证不重复。但不保证数据不丢失。
当设置为all时,可以保证每条消息都会发送成功,即At Least Once语义。保证不丢失,但是不保证不重复。
那么有没有办法保证既不丢失也不重复呢?
这里就要提到Exactly Once语义。
在0.11版本的Kafka,引入了一项重大特性:幂等性。
所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。
幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。
即:At Least Once + 幂等性 = Exactly Once。
要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true即可。这个配置目前在新版本中无法通过配置文件直接配置,可以通过手动配置kafka的配置文件添加进去。
Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个 PID,发往同一Partition的消息会附带Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。
作用范围
1)PID 重启就会变化。只能实现单回话上的幂等性,这里的会话指的是Producer进程的一次运行。
2)同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
如果需要跨会话、跨多个topic-partition的情况,需要使用Kafka的事务性来实现。关于事务的原理后面会讲解。
本章到此为止,下一章节讲解消费者相关的内容。