第2章 安装kafka
本章介绍Apache Kafka broker的入门知识,包括如何搭建Apache ZooKeeper集群, Kafka使用ZooKeeper为broker存储元数据。 本章还将涵盖Kafka部署的基本配置项,以及选择运行broker的硬件的一些建议。 最后,介绍如何安装多个Kafka broker作为集群的一部分,以及在生产环境中使用Kafka时应该知道的事情。
2.1 环境配置
在使用Apache Kafka之前,有一些先决条件(配置环境),以确保它正常运行。 下面几节将指导您完成这个过程。
2.1.1 选择操作系统
Apache Kafka是一个Java应用程序,可以在许多操作系统上运行。 虽然Kafka能够运行在许多操作系统上,包括Windows, macOS, Linux等,Linux是一般用例的推荐操作系统。 本章的安装步骤将重点介绍在Linux环境中安装和使用Kafka。 关于在Windows和macOS上安装Kafka的详细信息请参见附录A。
2.1.2 安装Java
在安装ZooKeeper或Kafka之前,需要配置一个Java环境。 Kafka和ZooKeeper可以兼容所有基于openjdk的Java实现,包括Oracle JDK。 Kafka的最新版本同时支持Java 8和Java 11。 安装的确切版本可以是操作系统提供的版本,也可以是直接从网络下载的版本(例如,从Oracle网站下载的Oracle版本JDK)。 尽管ZooKeeper和Kafka将与Java运行时(JRE)一起工作,但在开发工具和应用程序时,建议使用完整的Java开发工具包(JDK)。 建议安装Java环境的最新发布补丁版本,因为旧版本可能存在安全漏洞。 本章示例安装步骤假设已经安装了JDK版本11 update 10(在/usr/java/jdk-11.0.10目录)。
2.1.3 安装ZooKeeper
Apache Kafka使用Apache ZooKeeper存储Kafka集群的元数据,以及消费者客户端的详细信息,如图2-1所示。 ZooKeeper是一种集中式服务,提供维护配置信息、命名、分布式同步、组服务等功能。 本书不会深入关于ZooKeeper的细节,但只会对操作Kafka所需要的内容进行解释。 虽然可以使用Kafka发行版中包含的脚本来运行ZooKeeper服务器,但安装完整版本的ZooKeeper是很简单的。
图2-1 Kafka和Zookeeper
Kafka已经在ZooKeeper 3.5版本上进行了广泛的测试,并定期更新到最新版本。 在本书中,我们将使用ZooKeeper 3.5.9,可以从ZooKeeper网站下载。
独立服务器 (Standalone server)
ZooKeeper附带了一个基本的配置示例文件( /usr/local/zookeeper/config/zoo_sample.cfg),它在大多数情况下都能很好地工作。然而,在本书中,为了演示的目的,将手动创建一个配置文件,其中会使用一些基本设置。下面的示例将ZooKeeper安装在/usr/local/zookeeper中,基本配置在/usr/local/zookeeper中,数据存储在/var/lib/zookeeper中:
# tar -zxf apache-zookeeper-3.5.9-bin.tar.gz
# mv apache-zookeeper-3.5.9-bin /usr/local/zookeeper
# mkdir -p /var/lib/zookeeper
# cp > /usr/local/zookeeper/conf/zoo.cfg << EOF
>tickTime=2000
>dataDir=/var/lib/zookeeper
>clientPort=2181
> EOF
# export JAVA_HOME=/usr/java/jdk-11.0.10
# /usr/local/zookeeper/bin/zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
#
现在可以通过连接到客户端端口并发送四个字母的命令srvr来验证ZooKeeper是否在独立模式下正确运行。 这将返回运行服务器的基本ZooKeeper信息:
# telnet localhost 2181
Trying127.0.0.1...
Connected to localhost.
Escape character is'^]'.
srvr
Zookeeper version:3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on01/06/202119:49 GMT
Latency min/avg/max:0/0/0
Received:1
Sent:0
Connections:1
Outstanding:0
Zxid: 0x0
Mode: standalone
Node count:5
Connection closed by foreign host.
#
Zookeeper集合集群
ZooKeeper是按照集群架构设计的,以确保高可用性。 由于使用了平衡算法,建议集群包含奇数个服务器(例如,3、5,等等),因为大多数集群成员(quorum)必须工作,以便ZooKeeper响应请求。 这意味着在三个节点的集群中,可以在缺少一个节点的情况下运行。 对于五节点的集群,可以在缺少两个节点的情况下继续运行。
调整Zookeeper集群大小
考虑在一个五节点的集群中运行ZooKeeper。 要对集群进行配置更改(包括交换节点),需要一次重新加载一个节点。 如果集群不能容忍一个以上的节点宕机,那么进行维护工作就会带来额外的风险。 也不建议运行超过7个节点,因为一致协议的性质会导致性能开始下降。
此外,如果由于太多的客户端连接而导致5或7个节点无法满足负载,可以考虑添加额外的observer节点来帮助平衡只读流量。
要在一个集群中配置ZooKeeper服务器,它们必须有一个包含所有服务器的公共配置,并且每个服务器都需要在data目录中指定服务器ID号的myid文件。 如果集群中服务器的主机名是zoo1.example.com、zoo2.example.com和zoo3.example.com,配置文件可能如下所示:
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=zoo1.example.com:2888:3888
server.2=zoo2.example.com:2888:3888
server.3=zoo3.example.com:2888:3888
在这个配置中,initLimit是follower与leader连接的超时时间。syncLimit值是不同步的followers与leader的同步超时时间。这两个值的单位都是tickTime,即init Limit为20 × 2,000毫秒,即40秒。上述配置还列出了集群中的每个服务器。服务器指定使用如下格式:server.X=hostname:peerPort:leaderPort,参数说明如下:
X:服务器的ID号。必须是一个整数,但它不需要是基于零或顺序的。
hostname:服务器的主机名或IP地址。
peerPort:集群中的服务器相互通信的TCP端口。
leaderPort:leader选举的TCP端口。
客户端只需通过clientPort连接到集群,但是集群成员必须能够通过所有三个端口相互通信。
除了共有的配置文件之外,每个服务器必须在dataDir目录中有一个名为myid的文件。此文件必须包含服务器的ID号,该ID号必须与配置文件匹配。完成这些步骤后,服务器将启动并在一个集群中彼此通信。
在一台机器上测试zookeeper集群
通过将配置中的所有主机名指定为localhost,并为每个实例指定唯一的peerPort和leaderPort端口,可以在一台机器上测试和运行ZooKeeper集群。此外,需要为每个实例创建单独的zoo.cfg,并为每个实例定义唯一的dataDir和clientPort。这只用于测试目的,不推荐用于生产系统。
2.2 安装Kafka Broker
配置好Java和ZooKeeper后,就可以开始安装Apache Kafka了。当前版本可以从Kafka网站下载。截至撰写本文时,该版本是2.8.0,运行在Scala版本2.13.0下。本章中的示例使用的是2.7.0版本。
下面的例子中的Kafka安装在/usr/local/ Kafka,使用之前启动的ZooKeeper服务器,并将消息日志段存储在/tmp/Kafka-logs:
# tar -zxf kafka_2.13-2.7.0.tgz
# mv kafka_2.13-2.7.0 /usr/local/kafka
# mkdir /tmp/kafka-logs
# export JAVA_HOME=/usr/java/jdk-11.0.10
# /usr/local/kafka/bin/kafka-server-start.sh -daemon
/usr/local/kafka/config/server.properties
#
一旦Kafka broker启动,可以通过对集群执行一些简单的操作来验证它是否工作:创建一个test主题(topic),生产一些消息,并消费这些消息。
创建并验证一个主题:
# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test
Created topic"test".
# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition:0Leader:0Replicas:0Isr:0
#
生产消息到test主题(使用Ctrl-C停止生产者):
# /usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server
localhost:9092--topictest
Test Message1
Test Message2
^C
#
消费来自test主题的消息:
# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server
localhost:9092--topictest--from-beginning
Test Message1
Test Message2
^C
Processed a total of2messages
#
已弃用kafka cli工具中的zookeeper连接
如果你熟悉Kafka工具的旧版本,可能习惯使用--zookeeper连接字符串。 这在所有情况下都已被弃用。 当前的最佳实践是使用更新的--bootstrap-server选项并直接连接到Kafka broker。 如果在集群中运行,则可以提供集群中任何broker的 host:port。