Kafka设计之初就旨在提供3个方面的功能特性。
- 为生产者和消费者提供一套简单API。
- 降低网络传输和磁盘存储开销。
- 具有高伸缩性架构。
Kafka经历了几次大的版本变迁。每次发布大的版本时都会加入新的功能
Kafka从0.11.0.0版本开始不再支持Scala 2.10,故用户在下载0.11.0.x版本的Kafka时不会再看到kafka_2.10-0.11.0.0字样的下载包。
新版本producer
在Kafka世界中,通常把producer和consumer通称为客户端(即clients),这是与服务器端(即broker)相对应的。
在Kafka 0.9.0.0版本中,社区正式使用Java版本的producer替换了原Scala版本的producer。新版本的producer的主要入口类是org.apache.kafka.clients.producer.KafkaProducer,而非原来的kafka.producer.Producer。
新版本producer重写了之前服务器端代码提供的很多数据结构,摆脱了对服务器端代码库的依赖,同时新版本的producer也不再依赖于ZooKeeper,甚至不需要和ZooKeeper集群进行直接交互,降低了系统的维护成本,也简化了部署producer应用的开销成本
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
上面的代码中比较关键的是KafkaProducer.send方法,它是实现发送逻辑的主要入口方法。新版本producer整体工作流程图如图所示。
新版本的producer大致就是将用户待发送的消息封装成一个ProducerRecord对象,然后使用KafkaProducer.send方法进行发送。实际上,KafkaProducer拿到消息后对其进行序列化,然后结合本地缓存的元数据信息确立目标分区,最后写入内存缓冲区。同时,KafkaProducer中还有一个专门的Sender I/O线程负责将缓冲区中的消息分批次发送给Kafka broker。
比起旧版本的producer,新版本在设计理念上有以下几个特点(或者说是优势)。
发送过程被划分到两个不同的线程:用户主线程和Sender I/O线程,逻辑更容易把控。
完全是异步发送消息,并提供回调机制(callback)用于判断发送成功与否。
分批机制(batching),每个批次中包括多个发送请求,提升整体吞吐量。
更加合理的分区策略:对于没有指定key的消息而言,旧版本producer分区策略是默认在一段时间内将消息发送到固定分区,这容易造成数据倾斜(skewed);新版本采用轮询方式,消息发送将更加均匀化。
底层统一使用基于Java Selector的网络客户端,结合Java的Future实现更加健壮和优雅的生命周期管理。
当然,新版本producer的设计优势还有很多,比如监控指标更加完善等。以上5点只罗列出了最重要的设计特性。
新版本producer的API设计得也足够简单易用,只需要记住几个常用的方法就可以了,如下图所示。
- send:实现消息发送的主逻辑方法。
- close:关闭producer。
- metrics:获取producer的实时监控指标数据,比如发送消息的速率等。
新版本consumer
新版本consumer也是使用Java语言编写的,也不再需要依赖ZooKeeper的帮助。新版本consumer的入口类是org.apache.kafka.clients.consumer.KafkaConsumer。
在旧版本consumer中,消费位移(offset)的保存与管理都是依托于ZooKeeper来完成的。当数据量很大且消费很频繁时,ZooKeeper的读/写性能往往容易成为系统瓶颈。这是旧版本consumer为人诟病的缺陷之一。而在新版本consumer中,位移的管理与保存不再依靠ZooKeeper了,自然这个瓶颈就消失了。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
上面代码中比较关键的是KafkaConsumer.poll方法。它是实现消息消费的主逻辑入口方法。新版本consumer在设计时摒弃了旧版本多线程消费不同分区的思想,采用了类似于Linux epoll的轮询机制,使得consumer只使用一个线程就可以管理连向不同broker的多个Socket,既减少了线程间的开销成本,同时也简化了系统的设计。
比起旧版本consumer,新版本在设计上的突出优势如下。
单线程设计——单个consumer线程可以管理多个分区的消费Socket连接,极大地简化了实现。虽然0.10.1.0版本额外引入了一个后台心跳线程(background heartbeat thread),不过双线程的设计依然比旧版本consumer鱼龙混杂的多线程设计要简单得多。
位移提交与保存交由Kafka来处理——位移不再保存在ZooKeeper中,而是单独保存在Kafka的一个内部topic中,这种设计既避免了ZooKeeper频繁读/写的性能瓶颈,同时也依托Kafka的备份机制天然地实现了位移的高可用管理。
消费者组的集中式管理——上面提到了ZooKeeper要管理位移,其实它还负责管理整个消费者组(consumer group)的成员。这进一步加重了对于ZooKeeper的依赖。新版本consumer改进了这种设计,实现了一个集中式协调者(coordinator)的角色。所有组成员的管理都交由该coordinator负责,因此对于group的管理将更加可控。
比起旧版本而言,新版本在API设计上提供了更加丰富的功能,具体API方法如下图所示。
- poll:最重要的方法。它是实现读取消息的核心方法。
- subscribe:订阅方法,指定consumer要消费哪些topic的哪些分区。
- commitSync/commitAsync:手动提交位移方法。新版本consumer允许用户手动提交位移,并提供了同步/异步两种方式。
- seek/seekToBeginning/seekToEnd:设置位移方法。除了提交位移,consumer还可以直接消费特定位移处的消息。
和producer不同的是,目前新旧consumer共存于最新版本的Kafka中。虽然社区曾计划投票决定在0.11.0.0这个大版本上正式放弃对于旧版本consumer的支持,但是目前使用旧版本consumer的用户依然不在少数,故即使在最新的1.0.0版本中旧版本依然没有被移除,可以预见这种共存的局面还将维持一段时间。
旧版本功能简介
前面谈到的都是新版本客户端,包括了producer和consumer。新版本客户端必然是社区以后极力推荐使用的,但不可否认的是,旧版本依然有着广泛的用户基础,特别是对于那些早期使用Kafka的公司来说,他们大多数使用的是Kafka 0.8.x这个版本。就拿其中最广泛应用的0.8.2.2这个版本而言,这个版本的Kafka刚刚推出Java版producer,而Java consumer甚至还没有开发。所以,我们还是有必要简要了解一下旧版本客户端,毕竟很多核心设计思想都是一脉相承的。
旧版本producer
这里频频提到的旧版本就是指由Scala语言编写的producer,在比较新的Kafka官网介绍中用户已经找不到对于它的介绍了,但是对于依然使用0.8.x版本Kafka的用户而言,Scala producer依然可能活跃在他们的线上系统中。
Scala producer的入口类是kafka.producer.Producer,一段典型的代码如下:
Properties props = new Properties();
props.put("metadata.broker.list","localhost:9092, localhost:9093,
localhost:9094");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String> (config);
KeyedMessage<String, String> msg = new KeyedMessage<String, String>("my-topic", "hello, world.");
Producer.send(msg);
上面代码中的主要逻辑是由Producer.send方法实现的。该方法默认是同步机制的,即每条消息要等待服务器端发送响应给客户端,明确告知消息发送结果之后,才能开始下一条消息的发送,因此旧版本producer的吞吐量性能是很差的。当然它也提供了一个参数用于实现异步的消息发送逻辑,但是凡事有利就有弊,旧版本producer异步发送会有丢失消息的可能性,对于那些对数据有较强持久化要求的用户来说,异步并不是一个可选项。
新旧版本producer的工作流程原理有很大的相似性,只是新版本producer在各个方面都要优于旧版本。因此社区才会在Kafka 0.9.0.0版本中正式将旧版本“下架”。
对于API而言,旧版本的producer提供的功能也非常有限,如图所示。
由图可知,旧版本只提供了send和close两个方法,另外还提供了一个sync参数用于控制该producer是同步发送消息还是异步发送。因此整套API的设计实际上是非常“简陋”的。
旧版本consumer
不同于producer,旧版本consumer,即Scala consumer,其实并没有那么“旧”,也没有那么弱。如前所述,依然有很多用户在生产环境中使用着旧版本consumer。和新旧版本producer之间有巨大的性能差异不同,新旧版本consumer的性能差异似乎也没有那么大。换句话说,旧版本consumer没有那么“不堪”。这也是社区迟迟没有“下架”它的原因之一。
说起旧版本consumer,就不能不提高阶消费者(high-level consumer)和低阶消费者(low-level consumer)。没错,它们是专属于旧版本而言的。切记新版本是没有high-level和low-level之分的!high-level consumer其实就是指消费者组,而low-level consumer是指单个consumer,即standalone consumer。单个consumer是没有什么消费者组的概念的,每个consumer都单独进行自己的工作,与其他consumer不产生任何关联;而消费者组就是大家作为一个团队一起工作,彼此之间会“相互照应”。
low-level consumer
low-level consumer的底层实现就是SimpleConsumer类。一旦应用此类,Kafka会认为用户有自行管理消费者的需求,从而不会为用户提供任何组管理方面的功能(包括负载均衡和故障转移等),而用户需要自己解决这方面的问题。因此使用SimpleConsumer可以说是既灵活又麻烦。鉴于这些特点,很多需要灵活定制实现的第三方框架往往会采用这种low-level consumer,比如Apache Storm的Kafka插件storm-kafka就使用了SimpleConsumer来实现KafkaSpout。
low-level consumer的API设计得非常简单,具体类图如图2.7所示。
在SimpleConsumer的API设计中还有send方法,难道consumer还需要发送消息吗?其实,这里的send不是指发送消息,而是指发送具体的请求。事实上,尽管旧版本consumer已经不推荐用户使用了,但Kafka服务器底层依然有一部分代码在使用SimpleConsumer负责向其他broker发送特定类型的请求,即使用这里的send方法进行发送,所以读者不要把它和producer的send方法搞混淆了。
high-level consumer
如果说low-level consumer既麻烦又灵活,那么high-level consumer便是既省事又死板。如图2.8所示,high-level consumer主要的方法是createMessageStreams——该方法负责创建一个或多个KafkaStream,用于真正的消息消费。如果是在多台机器上,用户只需要简单地启动多个配置有相同组ID(group.id)的consumer进程;若是在同一台机器上,createMessageStreams方法也允许用户直接指定线程数来创建多consumer实例。不管是哪种方法,这些consumer实例都会自动组成一个消费者组来共同承担消费任务,假如任意时刻有consumer进程或实例宕机,该消费者组都会帮用户自动处理,根本不需要人工干预。如此看来,使用high-level consumer是很省事的,但为什么说它是死板的呢?high-level consumer不似SimpleConsumer那样灵活,可以从分区的任意位置开始消费。它只能从上次保存的位移处开始顺序读取消息,使用起来无法实现高度定制化的消费策略,故而说它是死板的。
如何选择kafka版本
根据功能场景
如果用户要在生产环境中应用流式处理组件Kafka Streams,那么就必须使用Kafka 0.10.0.0(含)之后的版本。若是从零开始搭建或处于技术选型阶段,笔者推荐使用最新版本的Kafka,即1.0.0,毕竟这个版本中修复了很多关于Streams的bug,并且完善了Kafka Streams的各种API接口。
如果要在生产环境中启用Kafka Security,那么至少要使用0.9.0.0及以后的Kafka版本,但最好能使用0.10.0.1之后的版本。
当然,如果将Kafka用于传统的消息引擎服务,甚至是分布式存储之用,那么对于这种需求而言在版本的选择上并没有太多的限制,只需要0.8.x以后的版本就行,因为到0.8版本才加入了集群间的备份机制。没有备份的消息引擎系统不能算是一个完备的解决方案。
根据客户端使用场景
所谓客户端使用场景无非两种:自行研发客户端和第三方框架提供客户端。如果是自行研发客户端,笔者推荐使用新版本的客户端,至少要是0.10.1.0版本的Kafka,因为自这个版本开始新版本consumer才算比较稳定。如果你之前的生产环境部署的是该版本之前的Kafka,那么还是建议使用旧版本consumer为妙。
如果是第三方框架直接提供的客户端,比如Storm或Spark,那么一定要注意这些框架官网上的说明。我们以Storm为例:Storm目前提供了两套Kafka实际数据传输的插件storm-kafka和storm-kafka-client其中,storm-kafka使用了旧版本consumer进行开发,storm-kafka-client则使用了新版本consumer。因此你需要根据使用的具体插件来确定Kafka的版本。比如你的生产环境中部署了Storm且使用了storm-kafka-client中的KafkaSpout实现,那么你的Kafka版本就必须是0.9.0.0或更高的版本。
建议使用比较新的Kafka版本,例如0.10.1.0或更高版本。另外根据客户端的稳定程度,笔者总结了一份详细的主流版本-客户端版本推荐关系表,如表2.2所示。
Kafka与Confluent
Confluent的开源版本与Apache社区的Kafka并无太大的区别,用户甚至完全可以使用Confluent Open Source来替代Apache Kafka,它的下载地址是https://www.confluent.io/download/。
下一章 kafa线上环境部署