1. kafka架构图
2. 角色分析
1. Broker
kafka作为一个消息中间件,用于存储和转发消息,可以把它想象成一个中介,股票经纪人就叫做broker。默认端口是9092,生产者和消费者都需要跟这个Broker建立连接才可以实现消息的收发。
2. 消息
客户端之间传输的数据称之为消息, 或者说是记录(record)。请记住,对于kafka来说,不管是消费者还是生产者都是客户端。 在客户端的代码中,Record可以是一个key-value键值对,生产者对应的封装类是ProducerRecord, 消费者对应的封装类是ConsumerRecord。消息在传输的过程中需要序列化,所有需要我们在代码中执行序列化工具。消息在服务端中存储的格式(RecordBatch和Record)。
3. 生产者
我们将发送消息的一方称之为生产者,接收消息的乙方称之为消费者,为了提升消息发送的速率,生产者并不是组条发送消息到broker中,而是批量发送的。多少条发送一次,由配置中的一个参数决定。
props.put("batch.size", 16384);
4. 消费者
一般来说,消费者获取消息存在两种方式,一种是pull, 一种是push。kafka采用的是pull模式。WHY?
消费者可以控制自己一次消费多少条消息
max.poll.record=500 #默认是500条
5.
生产者和消费者之间每条消息之间是如何关联起来的呢?也就是消费者怎么就知道自己需要消费什么消息?
队列的存在就是解决这个问题的。在kafka里面这个队列就是topic,。
生产者和Topic,Topic和消费者的关系都是多对多(不建议这么做)。
当生产者发送消息时,没有对应的Topic,这个时候会自动创建Topic。可以通过参数控制
auto.enable.topics.enable=true #默认时true
6. partition和Cluster
分区其实是一种数据库分片的思想。试想一下,如果一个topic中消息过多,会产生什么样的问题。
- 不方便横向扩展,通过扩展机器而不是升级硬件扩展。
- 并发负载,所有的客户端都操作同一个topic,在高并发的场景下,性能瓶颈
kafka分区概念---partition。一个topic可以划分成多个分区,分区在创建topic的时候指定,每个topic至少有一个分区。
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xiong
如果没有指定分区数,默认分区是1个,可通过下述参数修改
num.partitions=1
partition负载的实现。举例说明,Topic有三个分区,生产者发送了9条消息,第一个分区存储了1 4 7, 第二个分区存储了2 5 8,第三个分区存储了3 6 9。这种情况下其实就是负载的一种体现
每个partition都会有一个物理目录。kafka的配置文件下可以配置日志的存储路径,默认存储在/tmp/kafka-logs下,假设topic=xiongTopic, 每个分区的存储目录就是xiongTopic-0、xiongTopic-1.....
7. 副本机制
如果partition的数据只存储了一份,在发生网络或者硬件故障的时候,该分区的数据会无法访问或者无法恢复了。kafka在0.8版本之后增加了副本机制, 每个partiotion可以有若干个副本,。一般我们说的副本包括其中的主节点。
由replication-factor指定一个Topic的副本数:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partition --topic testxiong
服务端有个参数控制默认的副本数
offsets.topic.replication.factor=3
leader用粉红色标识,follower用绿色标识,leader是由选举得出。
生产者消费者消息传递都是通过leader来操作,follower的数据是通过leader同步过来的。
8. Segment
kafka的数据是放在后缀为.log的文件中,试想一下,kafka的数据在同一个partition中是顺序写入的,我们不断的追加数据,那保存数据的文件就会越来越大,这个时候检索的效率就会越来越低。
所以,kafka这块干脆对partition再次进行了切分,切分出来的单位就就做段(segment),实际上kafka数据的存储是分段的。我们可以在kafka的存储目录下看到这三个文件都是成对出现的:
这其中是一个数据文件,2个索引文件。segment的默认存储大小是1G,可以通过一下参数进行控制。
log.segment.bytes=1073741824
9. Consumer Group
在kafka中,消费者是以消费者组的形式对消息进行接收。每个消费者组都会由一个group id与对应的topic进行绑定。
- 消费者组中,消费者数量比partition数量少的情况下,一个消费者同时消费多个partition。
- 消费者组中,消费者数量比partition数量多的情况下,存在消费者空闲。
这两种情况都不是效率最高的情况,只有消费者数量和partition数量保持一致才是最好的选择。如果想要消费同一个partition,就需要另一个消费者组来进行。
10. Comsumer Offset
我们前面谈到,在Kafka中消息是顺序写入的,并且消费的消息是不会被删除的。那么,如果消费者突然挂掉,或者进行下次读写时,如何知道自己已经读取了哪些信息,该从何处继续读取消息呢?
既然消息是有序的,那我们就可以给消息进行编号,来唯一标识一条消息。
这里的编号我们就称之为offset,偏移量。offset记录着下一条将要发送给consumer的消息序号。offset的保存是保存在服务端的,并不是保存在ZK上面。
3. Kafka Java开发
生产者:
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.182.128:9092");
// 设置key value序列化的工具
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置消息接收确认模式 0 发出就立刻确认, 1 leader接收到就确认 all 所有follower同步完成再确认
props.put("acks","1");
// 异常重试次数
props.put("retries", 3);
// 设置批量发送数据一次,数据大小,默认16k
props.put("batch.size",16384);
// 设置批量发送等待时间
props.put("linger.ms", 5);
// 设置客户端缓冲区大小,默认是32M,满了以后也会出发消息发送
props.put("buffer.memory", 33554432);
// 获取元数据时生产者的阻塞时间,超时后抛出异常
props.put("max.block.ms", 3000);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i=0; i < 100; i ++) {
producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
消费者
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.182.128:9092");
props.put("group.id", "xiong-group");
// 是否自动提交偏移量,只有commit之后才更新消费者组的offset
props.put("enable.auto.commit", "true");
// 消费者自动提交的时间间隔
props.put("auto.commit.interval.ms", "1000");
// 从最早的数据开始消费earliest | latest | none
props.put("auto.offset.reset", "earliest");
// 设置key value反序列化的工具
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//订阅队列
consumer.subscribe(Arrays.asList("mytopic"));
try{
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset=%d, key=%s, value=%s, partition=%s%n",
record.offset(), record.key(), record.value(), record.partition());
}
}
} finally {
consumer.close();
}
}
}
查询消费者相关偏移量数据:
./kafka-consumer-groups.sh --bootstrap-server 192.168.182.128:9092 --describe --group xiong-group
4. 消息幂等性
什么叫做消息幂等性?
简单来说就是,消息发送一次的结果和发送多次的结果是一样的。
有时候消息消费失败的情况下,我们可能会采用消息重发的机制。但是生产者有时候是不知道消息是不是真的消费失败时,这时候消息的重发可能会产生消息重复的情况。
kafka实现消息的幂等性是在broker中实现的,而不是消费者端实现,大大的解放了消费者的双手。
如何实现消息的去重?
去重是需要依赖生产者消息的唯一标识的,不然我们没法知道是否是同一条消息,kafka中可以通过如下配置来产生唯一标识,将producer升级成幂等性的producer。
props.put("enable.idempotence", true);
实现机制:
- PID(Producer ID), 幂等性的生产者每个客户端都有一个唯一的编号。
- sequence number,幂等性的生产者发送的每条消息都会带sequence number, Server端就是通过这个值来判断消息是否重复。如果server端发现sequence number的值比服务端记录的值要小,那证明这个消息是重复的消息。(同一分区消息顺序写入,之前如果存在sequence number较小的在后面写入,那证明之前肯定已经有相同的消息已经发送过来过了)。
作用范围:
- sequence number并不是全局有序,不能保证所有时间上的幂等。只能保证单分区上的幂等。
- 单会话上的幂等,这里的会话是指producer进程的一次运行。当producer重启以后就不能保证了。
5. 生产者事务
生产者与事务有关的方法如下:(kafka 0.11版本以后才支持事务)
对象 | 描述 |
---|---|
initTransactions() | 初始化事务 |
beginTransaction() | 开启事务 |
commitTransaction() | 提交事务 |
abortTransaction() | 中止事务 |
sendOffsetsToTransaction() | sendOffsetsToTransaction方法是消费者和生产者在同一段代码使用的(从上游接收消息发送给下游),在提交的时候把消费消息的offset发送给consumer Corordinator. |
代码示例:
//事务的前提是消费者的幂等性
props.put("enable.idempotence", true);
//设置事务id,唯一
props.put("transactional.id", UUID.randomUUID().toString());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.initTransactions();
try{
producer.beginTransaction();
for (int i=0; i < 100; i ++) {
producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
if (i == 20) {
Integer j = 1/0; //制造异常
}
}
producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(100), Integer.toString(100)));
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
producer.close();
kafka分布式事务的实现:
- 生产者的消息会分区,所以这里的事务属于分布式事务。kafka采用的是2PC提交。如果大家都可以commit就提交,否则就abort;
- 2PC的情况下,需要一个协调者,在Kafka中这个角色叫做Transaction Coordinator。
- 事务管理必须有事务日志来记录事务的状态,以便在Coordinator以外挂掉以后继续处理原来的事务。事务日志的存储类似于消费者offset的存储,kafka使用了一个特殊topic--transaction_state来记录事务的状态信息。
- 如果生产者挂了,事务要在重启以后继续处理就需要有一个唯一的事务id来找到对应的事务,这个就是transaction.id。配置了transaction.id,此时生产者必须是幂等性的生产者。事务id相同的生产者可以继续处理原来的事务。
步骤描述:
A: 生产者通过initTransactions Api向coordinator注册事务id。
B: Corrdinator记录事务日志
C: 生产者将消息写入目标分区
D: 分区域Coordinator的交互,当事务完成以后消息的状态应该是已提交。这时候消费者才能消费