1. Kafka的介绍
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
2. 搭建环境
JDK 10+[图片上传中...(1.png-2f5f56-1625274266941-0)]
Zookeeper
Kafka 2.x
.NET 5
Visual Studio 2019
Confluent.Kafka
3. Kafka集群
集群的结构
如下图:
- Zookeeper :对kafka选举做集群的节点的选举,可以看作是个数据库,可做分布式锁,可实现强一致性。
以下是选举流程
kafka每个节点服务运行后,首先向zk中注册 watcher ,注册成功后watcher就与该节点之间产生心跳,运行一段时间后当主节点宕机后,其对应的zk中的节点注册也会消失。同时激活watcher,读取剩下的所有节点确定宕机的节点的分区与消费信息,然后从剩下的节点中做选举。其中选举有三种情况:
正常选举:剩下的节点向ZK 发送指令LeadersandISR,写的快的节点为新的leader。
剩下一个节点 ,这个节点直接成为leader。
所有节点都宕机,等待其中一个恢复中。
4. 相关术语
- Producer(生产者):生产数据对应客户端。
- Consumer(消费者):负责处理kafka服务里面消息。
- Consumer Group/Consumers(消费者组):就是kafka独特处理轮询还是广播。
轮询:消费者每一个处理一条。
广播:一条信息,多个消费者同时处理。 - Broker(服务):就是kafka服务,一个Broker可以创建多个topic。
- Topic(主题):写入broker的主题,一个kafka集群里面可以有多个Topic,为了区分业务和模块使用。
- Partition(分区):把一个topic的信息分成几个区,利用多个节点把多个分区,放在不同节点上面,实现负载均衡,kafka内部实现的。
- Replica(副本):防止主节点宕机数据丢失了,保证高可用。
- Offset(偏移量):就是消息的主键,生产者写入数据后返回的偏移量,消费者消费数据知道数据消费的位置,防止重复消费。
5. Kafka集群消息管理
以下是示意图:
- 生产者生成的消息对Key做Hash 后做相应的规则区分放到 分区0/1 中的Leader中,Leader 会内部把数据备份到其他broker的备份中,这样的交叉备份的好处就是当其中一个broker 宕机后,不会导致数据的丢失。
6. Kafka的搭建
6.1 单机版(Docker 搭建)
搭建使用docker-compose.yml
version: '2'
services:
zoo1:
image: wurstmeister/zookeeper
restart: unless-stopped
hostname: zoo1
ports:
- "6181:2181"
container_name: zookeeper_kafka
# kafka version: 1.1.0
# scala version: 2.12
kafka1:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 81.70.91.63
# zoo1:2181 也可以改成:81.70.91.63:6181
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
depends_on:
- zoo1
container_name: kafka
运行:
到docker-compose.yml 所在的目录下运行:docker-compose up -d
6.2 集群版(Docker 搭建)
首先会对zookeeper 做集群
以下是 zk.yml 文件
version: '3.4'
services:
zoo1:
image: zookeeper:3.4
restart: always
hostname: zoo1
container_name: zoo1
ports:
- 2184:2181
volumes:
- "/szw/volume/zkcluster/zoo1/data:/data"
- "/szw/volume/zkcluster/zoo1/datalog:/datalog"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
kafka:
ipv4_address: 172.19.0.11
zoo2:
image: zookeeper:3.4
restart: always
hostname: zoo2
container_name: zoo2
ports:
- 2185:2181
volumes:
- "/szw/volume/zkcluster/zoo2/data:/data"
- "/szw/volume/zkcluster/zoo2/datalog:/datalog"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888
networks:
kafka:
ipv4_address: 172.19.0.12
zoo3:
image: zookeeper:3.4
restart: always
hostname: zoo3
container_name: zoo3
ports:
- 2186:2181
volumes:
- "/szw/volume/zkcluster/zoo3/data:/data"
- "/szw/volume/zkcluster/zoo3/datalog:/datalog"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
networks:
kafka:
ipv4_address: 172.19.0.13
networks:
kafka:
external:
name: kafka
Kafka 做集群
以下是:kafka.yml 文件
version: '3.4'
services:
kafka1:
image: wurstmeister/kafka
restart: always
hostname: kafka1
container_name: kafka1
privileged: true
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://81.70.91.63:9092
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
volumes:
- /szw/volume/kfkluster/kafka1/logs:/kafka
external_links:
- zoo1
- zoo2
- zoo3
networks:
kafka:
ipv4_address: 172.19.0.14
kafka2:
image: wurstmeister/kafka
restart: always
hostname: kafka2
container_name: kafka2
privileged: true
ports:
- 9093:9093
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_LISTENERS: PLAINTEXT://kafka2:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://81.70.91.63:9093
KAFKA_ADVERTISED_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
volumes:
- /szw/volume/kfkluster/kafka2/logs:/kafka
external_links:
- zoo1
- zoo2
- zoo3
networks:
kafka:
ipv4_address: 172.19.0.15
kafka3:
image: wurstmeister/kafka
restart: always
hostname: kafka3
container_name: kafka3
privileged: true
ports:
- 9094:9094
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_LISTENERS: PLAINTEXT://kafka3:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://81.70.91.63:9094
KAFKA_ADVERTISED_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
volumes:
- /szw/volume/kfkluster/kafka3/logs:/kafka
external_links:
- zoo1
- zoo2
- zoo3
networks:
kafka:
ipv4_address: 172.19.0.16
networks:
kafka:
external:
name: kafka
界面管理工具:用来监控集群管理
以下是:kafkamanage.yml 文件
version: "3.4"
services:
kafka-manager:
image: sheepkiller/kafka-manager:latest
restart: always
container_name: kafka-manager
hostname: kafka-manager
ports:
- 9000:9000
environment:
ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_BROKERS: kafka1:9092,kafka2:9092,kafka3:9092
APPLICATION_SECRET: letmein
KM_ARGS: -Djava.net.preferIPv4Stack=true
networks:
kafka:
ipv4_address: 172.19.0.17
networks:
kafka:
external:
name: kafka
启动
#docker-compose 默认会创建网络,不需要手动执行
#如果执行错误,则需要删除其他的network
#docker network ls
#查看详细的 network
#docker network inspect name/id
#
docker network create --driver bridge --subnet 172.19.0.0/16 --gateway 172.19.0.1 kafka
docker-compose -f zk.yml -f kafka.yml -f kafkamanage.yml up -d
安装成功:
访问 kafkamanage 管理工具的地址:(http://81.70.91.63:9000/)
注意:添加cluster 和 Zookeeper host 注意默认数值提示,保存。
添加成功后:
7. 代码对接
7.1 生产者
Nuget:安装Confluent.Kafka包
对接代码:
public static async Task Produce(string brokerlist, string topicname, string content)
{
string brokerList = brokerlist;
string topicName = topicname;
var config = new ProducerConfig
{
BootstrapServers = brokerList,
Acks = Acks.All,
// 幂等性,保证不会丢数据。
EnableIdempotence = true,
//信息发送完,多久数据发送到broker里面。
LingerMs = 10000,
BatchNumMessages = 2,//字节数
// 只要上面的两个要求符合一个,则后台线程立刻马上把数据推送给broker
// 可以看到发送的偏移量,如果没有偏移量,则就是没有写成功
MessageSendMaxRetries = 3,//补偿重试,发送失败了则重试
// Partitioner = Partitioner.Random
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
try
{
var deliveryReport = await producer.
ProduceAsync(
topicName, new Message<string, string> { Key = (new Random().Next(1, 10)).ToString(), Value = content });
Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
}
}
}
注意:
- 生产端写ack ,消费端不需要。
- ACK 保证数据不丢失但是会影响到我们性能。越高级别数据不丢失,则写入的性能越差。
- 建议使用异步,性能比较好。
- ProduceAsync 中的Key:Key 注意是做负载均衡,比如,有三个节点,一个topic,创建了三个分区。一个节点一个分区,如果写入的数据的时候,没有写key,会导致,所有的数据存放到一个分区上面。如果用了分区,必须要写key .根据自己的业务,可以提前配置好。key的随机数,可以根据业务,搞一个权重,如果节点的资源不一样,合理利用资源。
- 数据写入如果默认一个分区,则是有顺序,如果是多个分区,则不能保证数据的顺序。
8. 数据的可靠性
数据的可靠性就是保证数据能写入Broker,并且在Broker宕机后重新选举出来的Leader也不会导致数据的丢失,这样就关系到了ACK的返回机制。
以下是Kafka的数据写入流程。
对于ACK的返回策略有两种:1. 半数以上的Follower完成同步返回ACK ,2. 全部的Follower完成同步返回ACK。
以下是两种ACK的优缺点对比
方案 | 优点 | 缺点 |
---|---|---|
半数以上完成同步发送ACK | 延迟低 | 选举Leader,容忍N台节点故障,需要2N+1个副本(数据的大量冗余) |
全部完成发送ACK | 选举Leader,容忍N台节点故障,需要N+1个副本 | 延迟高 |
最后Kafka选用了方案2 ,全部完成发送ACK。
9. ISR(同步副本集)
ISR: 同步的副本集合,维护当前还在存活办事的副本,由于数据可靠性的选择,解决防止当一个副本出现问题时候,不能正常的返回ack。
维护ISR的原理:维护是由ZK 完成的,一般的判断断线是:心跳与数据备份量。
如果是根据数据的备份量:leader和副本数据相差一定条数,则就认为副本节点断开,然后从isr移除,当数据备份跟的上来,然后又重新加入到ISR集合。
心跳:一定的时间没有进行心跳。 超过配置时间,则认为断开连接,从ISR中移除当心跳跟的上,在进入ISR集合。
kafka是个高吞吐的消息队列, 发送数据的时候,有批量发送的功能,每次发数据的可以发送大量的数据,这个是可配的。 所以如果根据条数,则副本节点会经常性的从ISR移除和加入。 因为这种考虑,kafka的开发者,选择使用根据时间来判断。
其中控制批量发送的条数就是: BatchNumMessages(可通过生产者代码对接中找到):当内存的数据条数达到了,立刻马上发送到Broker
10. ACKS
系统提供的ACK 设置有三种:-1,0, 1。默认设置是0。
10.1 ACKS为0
Broker接收到数据立刻返回到生产者ACK,并且在数据做持久化之前。如下图:
优点:性能最高。
缺点:丢失数据概率也最大。当Leader接收到数据,但是还没有持久化时宕机,会导致数据的丢失。
使用场景:日志系统,IOT设备状态信息上传。
10.2 ACKS为1
Broker接收到数据并且做完持久化落盘后返回到生产者ACK。如下图:
优点:性能中等。
缺点:有丢失数据概率也最大。当Leader接收到数据,。持久化后,没有做Follower Leader宕机。新的Follower 没有更新下Leader最新数据,然后选举成了Leader。只有通过人工介入找回数据。
使用场景:暂没发现。
10.3 ACKS为-1
Leader和所有的Follower全部落盘成功后返回ACK。如下图:
优点:数据不会丢失。
缺点:
- 导致新的问题爆出,幂等性问题。导致幂等性问题的原因为:当数据都备份完成,要返回ACK时候leader宕机,新的副本替代成为Leader,生产者因为没有收到ACK,所以补偿重试,再次发送信息导致数据重复。
- 性能比较低下,原因为:1. 数据的备份。2. 如果加入幂等性,服务端会验证数据的唯一性。
- 使用场景:使用不多。
11. 幂等性
var config = new ProducerConfig
{
BootstrapServers = brokerList,
Acks = Acks.All,
//开启幂等性的配置
EnableIdempotence = true,
}
开启幂等性:发送的数据的时候,服务端会验证信息的唯一性,如果之前发送过,就不在接受,然后只会保留一条相同的信息。
幂等性是由:消息ID,客户端ID,分区Key 共同的组成。
12. 生产者事务
如果发送的消息的Topic,是在多个分区,需要用事务的模式来保证多个分区的幂等性。
示例如下:
string brokerList = "192.168.1.2:9092,192.168.1.3:9093,192.168.1.4:9094";
// 不同的topic的testtransactionalId就不同
string topicName = "test";
// 不一样的topic,transactionalId就写的不一样。。
string transactionalId = "transtest1";
var config = new ProducerConfig
{
BootstrapServers = brokerList,
EnableIdempotence = true,
Acks = Acks.All,
TransactionalId = transactionalId,
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
try
{
//初始化事务
producer.InitTransactions(DefaultTimeout);
var currentState = ProducerState.InitState;
producer.BeginTransaction();
for (int i = 100; i < 110; i++)
{
var content = i.ToString();
producer.Produce(
topicName, new Message<string, string> { Key = content, Value = content });
}
//提交
producer.CommitTransaction(DefaultTimeout);
}
catch (Exception ex)
{
//回滚
producer.AbortTransaction(DefaultTimeout);
Console.WriteLine(ex.Message);
}
}
如果:写入数据有一个 节点的Leader 失败,就会自动的通知其他的Leader 做书的回滚。
13. 高效的原理
13.1 批量发送
-
生产者发送数据是批量的
生产者在进行数据的写入的时候,会有两个线程在维护,a.写数据的线程,b.后台线程。a.线程负责把数据写入到生产者维护的缓存区中。b.线程负责把缓存区的数据写入到Kafka 中。
10.png 而控制b线程进入Kafka的变量就是:
LingerMs = 10000, //时间毫秒为单位
BatchNumMessages = 2,//字节数
以上条件有一个就满足则立马写入Kafka节点中。消费端消费数据的时候是批量的
13.2 顺序读写
大数据处理一般都是做的顺序读写:
增:顺序写入就可以。
修改:也是顺序的写入,后台线程会去处理,合并修改的数据。
删除:也是顺序的写入,后台线程会去处理,合并删除的数据。
所以,这类数据处理适合处理大量写入的数据,少了修改和删除的数据,因为这样会降低数据处理性能,并且修改,删除的数据处理也会有延时。
13.3 零拷贝
传统的数据处理是:三次数据的拷贝:磁盘->内核->用户进程->内核 ,目的就是为了保证资源的安全性。以下是示意图:
是由Linux 系统实现的一
种快捷的方式。减少了内核到用户,用户到内核的两次拷贝。如下图:
14. 消费者
消费者:有两种方式,一种是推送,一种是拉去。
14.1 推送
kafka主动去推数据,如果遇到高并发的时候,可能消费端还没有把之前的数据处理完,然后强推了大量的数据过来,有可能造成我们消费端的挂机
14.2 拉取
消费端主动的去拉取,可能存在数据延迟消费,不会造成我们消费端的宕机,同样的存在一个微循环,不停的拉取数据。
总结:kafka 选取的是拉取的模式去消费数据。对比同样的MQ ,rabbitmq 则,既可以使用拉取,也可以使用推送(推送的时候可以设置限流的方式)的方式去消费。
14.3 消费的偏移量Offset
0.9版本之前的编译量是由Zookeeper保存的维护的。
0.9版本之后是由自己维护(topic: __consumer_offsets)的。
14.4 代码的对接
public static void Run_Consume(string brokerList, List<string> topics, string group)
{
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
GroupId = group,
// 有些属性可以写,但是没有用到
//Acks = Acks.All,
//消费方式自动提交
EnableAutoCommit = false,
//消费模式
AutoOffsetReset = AutoOffsetReset.Earliest,
//EnablePartitionEof = true,
//PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range,
//FetchMaxBytes =,
//FetchWaitMaxMs=1,
//代表数据超过了6000没有处理完业务,则把数据给其他消费端
//一定要注意。SessionTimeoutMs值一定要小于MaxPollIntervalMs
SessionTimeoutMs = 6000,
MaxPollIntervalMs = 10000,
};
const int commitPeriod = 1;
//提交偏移量的时候,也可以批量去提交
using (var consumer = new ConsumerBuilder<Ignore, string>(config).SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")).SetPartitionsAssignedHandler((c, partitions) =>
{
//自定义存储偏移量
//1.每次消费完成,把相应的分区id和offset写入到mysql数据库存储
//2.从指定分区和偏移量开始拉取数据
//分配的时候调用
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
#region 指定分区消费
// 之前可以自动均衡,现在不可以了
//List<TopicPartitionOffset> topics = new List<TopicPartitionOffset>();
//// 我当前读取所有的分区里面的从10开始
//foreach (var item in partitions)
//{
// topics.Add(new TopicPartitionOffset(item.Topic, item.Partition, new Offset(10)));
//}
//return topics;
#endregion
}) .SetPartitionsRevokedHandler((c, partitions) =>
{
//新加入消费者的时候调用
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
}).Build())
{
//消费者会影响在平衡分区,当同一个组新加入消费者时,分区会在分配
consumer.Subscribe(topics);
try
{
// 死循环 拉取模式
while (true)
{
try
{
var consumeResult = consumer.Consume();
if (consumeResult.IsPartitionEOF)
{
continue;
}
Console.WriteLine($": {consumeResult.TopicPartitionOffset}::{consumeResult.Message.Value}");
if (consumeResult.Offset % commitPeriod == 0)
{
try
{
//提交偏移量,数据自己已经处理完成了
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}
}
//调用方式
Consumer("192.168.1.10:9092,192.168.1.11:9093,192.168.1.12:9094", "test", "groupname");
- 自动ACK:Acks = Acks.All 标记是无效的
- 消费提交:EnableAutoCommit = false,自动提交服务端数据已经消费,服务端标记本数据为。一般设置为:false。如果设置成True自动提交,在接收到数据后处理过程出现异常,会导致无法重复消费这个数据,丢失。如设置成false,可能导致数据的重复消费,比如手动提交时候服务器断开。重新连接后重复消费,解决办法就是:发送消息加上一个唯一ID,消费了就加入到Redis 中,下次来了判断数据是否消费过,没消费就重新消费,消费了就提交。
- 消费模式:AutoOffsetReset:AutoOffsetReset.Latest 即:0,表示消费者消费启动之后的数据。启动之前的服务端还没消费的数据消费不到。AutoOffsetReset.Earliest = 1,每次都从头开始消费没有消费过的数据(推荐模式)。AutoOffsetReset.Error = 2.。报错后无法消费。
- 消费端:组和组之间是广播模式,组内是根据分区数量。多个客户端去消费,如果是组相同则自动做负载均衡,开启的消费的相同组客户端最大数量等于分区的数量,开启多出来的客户端消费不到数据。重新连接新的组则从新的消费该组没有消费的数据与别的组消费数据无关。 原则上是:topic的数量=broker的数量,broker数量=分区数量,分区数量=一个组内的消费者数量。
15. 异常情况消费
当Leader offset 为8 宕机 Follow1 offset 为7 Follow2 offset 为6 Follow2选择为Leader 。消费数据的时候是能从8 消费的,但是Follow2 最为主节点时候,没有8 。这种情况Kafka是这样处理的。
Kafka使用的 LEO和HW的机制去处理的。
LEO:指的是每个副本最大的 offset。
HW:指的是消费者能见到的最大的 offset,ISR队列中最小的 LEO。
这样就保证了消费者看到的是全部备份完的的偏移量了。
16. 高级消费
如果当前超过时间没有消费完成,则返回给另一个分区去消费,以下是设置参数。
SessionTimeoutMs = 6000,
MaxPollIntervalMs = 10000,
注意: MaxPollIntervalMs的值必须大于等于SessionTimeoutMs
上面类似心跳,如果消费水平太慢,则会引起重新分配
17. 文件储存机制
文件的架构:通过小文件的不断合并最后转成了一个大文件。结构如图:
保存文件的结构:
000000000000000000000000.log
000000000000000000000000.index
000000000000000000000700.log
000000000000000000000700.index
000000000000000000002000.log
000000000000000000002000.index
注意:log文件和index 文件是成对出现的,000000000000000000000000.log 保存的是 0-699 的数据。
数据的查找:通过文件的名字,使用的二分发做的查找。如下图:
注意:除了爆露出来的偏移量之外,Kafka 内部还key值对应与log文件中。
18. 日志的压缩策略
kafka定期将相同key的消息进行合并,只保留最新的value值。
保存的每一条数据,会记录是增加,删除,还是修改。
19. 自定义存储
自定义存储(解决重复消费)
- 每次消费完成,把相应的分区和offset写入到mysql数据库。
- 从指定分区和偏移量开始拉取数据。
.SetPartitionsAssignedHandler((c, partitions) =>
{ //获取mysql存储结果,从当前开始获取
foreach (var item in partitions)
{
topics.Add(new TopicPartitionOffset(item.Topic, item.Partition, new Offset(10)));
}
})
consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 1, Offset.Beginning)).ToList());
20.注意:
- 消费者 一般使用workservice。
- 消费端只关心topic和偏移量,其余不关心。
- 保留7天,kafka可以配置。默认7天,消息积压有处理。
- 消费数据先消费Leader 的。