一、说明
# 1、此文章的基础概念部分参考了如下文章(总结的比较到位)
https://www.cnblogs.com/yaochunhui/p/15506926.html
# 2、其余部分参考官网
https://kafka.apache.org/documentation
# 3、kafka资源包
https://archive.apache.org/dist/
二、kafka是什么
kafka是一个多分区、多副本且基于zookeeper协调的分布式消息系统。也是一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
三、kafka作用
# 1、消息系统
kafka具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
# 2、存储系统
Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于Kafka 的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
# 3、流式处理平台
Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
四、kafka结构和术语
一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个ZooKeeper集群,如下图所示。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的。Producer将消息发送到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。
# 1、Producer
生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中。
# 2、Consumer
消费者,也就是接收消息的一方。消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理。
# 3、Consumer Group (CG)
消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
# 4、Broker
服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的Kafka服务节点或Kafka服务实例。一个kafka集群由多个 broker 组成。一个 broker可以容纳多个 topic。
# 5、Topic
Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费
# 6、Partition
主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。
Kafka中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个broker,以此来提供比单个broker更强大的性能。
每一条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器 I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。 (7)Replica:Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker失效时仍然能保证服务可用。
Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。
五、kafka 集群搭建
5.1、环境
ip |
hostname |
备注 |
192.168.13.210 |
kafka-01 |
kafka broker、zookeeper |
192.168.13.213 |
kafka-02 |
kafka broker、zookeeper |
192.168.13.223 |
kafka-03 |
kafka broker、zookeeper |
5.2、获取资源包
# 如下步骤需要在所有节点上操作
cd ~ && mkdir efk && yum install wget -y
wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
tar -xf kafka_2.12-2.6.2.tgz
cp -r kafka_2.12-2.6.2 /opt/kafka
mkdir /data/{kafka,zookeeper,kafka-logs,zkdatalog} -p
5.3、配置文件
5.3.1 192.168.13.210(kafka-01)
[root@kafka-01 bin]# cat /data/zookeeper/myid
1
[root@kafka-01 bin]# cat /opt/kafka/config/zookeeper.properties
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/
dataLogDir=/data/zkdatalog/
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=192.168.13.210:2888:3888
server.2=192.168.13.213:2888:3888
server.3=192.168.13.223:2888:3888
[root@kafka-01 bin]# cat /opt/kafka/config/server.properties
# broker.id,同一个集群里面,每一个服务器都需要唯一的一个ID,非负整数,kafka节点通过id来识别broker节点。当该节点的ip地址发生变化,broker.id没有变化,则不会影响consumers的消息情况。
broker.id=0
# listeners,监听客户端请求的IP和端口,默认都是9092
listeners=PLAINTEXT://192.168.13.210:9092
host.name=192.168.13.210
port=9092
# broker处理网络(io)的线程数,一般情况下不需要去修改
num.network.threads=3
# broker处理磁盘io的线程数
num.io.threads=8
# socket收发的缓冲区大小
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# socket请求的最大数值
socket.request.max.bytes=104857600
# kafka数据存放地址,多个地址的话用逗号分割
log.dirs=/data/kafka-logs
# 默认的partition数目
num.partitions=3
num.recovery.threads.per.data.dir=1
# 数据保存时间
log.retention.hours=168
# 文件分段的大小
# topic分区是以一堆segment文件存储的,这个参数用来控制每个segment的大小
log.segment.bytes=1073741824
# 文件大小检查的周期时间
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.13.210:2181,192.168.13.213:2181,192.168.13.223:2181
# zk连接超时
zookeeper.connection.timeout.ms=6000
# 我的kafka集群有三个broker,当其中一个broker宕机后,会影响消费,报错信息如下:
# 【Number of alive brokers '2' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)】
# 这个值默认是3(3个拷贝、replication),当发生宕机后,就无法满足3个cp,会影响消费,在这我设置为2,在有broker宕机的情况下不影响使用。
offsets.topic.replication.factor=2
5.3.2 192.168.13.213(kafka-02)
[root@kafka-02 ~]# cat /data/zookeeper/myid
2
[root@kafka-02 ~]# cat /opt/kafka/config/zookeeper.properties
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/
dataLogDir=/data/zkdatalog/
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=192.168.13.210:2888:3888
server.2=192.168.13.213:2888:3888
server.3=192.168.13.223:2888:3888
[root@kafka-02 ~]# cat /opt/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.13.213:9092
host.name=192.168.13.213
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.13.210:2181,192.168.13.213:2181,192.168.13.223:2181
zookeeper.connection.timeout.ms=6000
offsets.topic.replication.factor=2
5.3.3 192.168.13.223(kafka-03)
[root@kafka-03 bin]# cat /data/zookeeper/myid
3
[root@kafka-03 bin]# cat /opt/kafka/config/zookeeper.properties
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/
dataLogDir=/data/zkdatalog/
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=192.168.13.210:2888:3888
server.2=192.168.13.213:2888:3888
server.3=192.168.13.223:2888:3888
[root@kafka-03 bin]# cat /opt/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.13.223:9092
host.name=192.168.13.223
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.13.210:2181,192.168.13.213:2181,192.168.13.223:2181
zookeeper.connection.timeout.ms=6000
offsets.topic.replication.factor=2
六、集群测试
6.1、创建topic
rootkafka-topics.sh --create --zookeeper 192.168.13.210:2181 --replication-factor 3 --partitions 1 --topic fzh
6.2、发送消息
./kafka-console-producer.sh --topic fzh --bootstrap-server 192.168.13.210:9092
6.3、打开消息监听
./kafka-console-consumer.sh --topic fzh --from-beginning --bootstrap-server 192.168.13.223:9092