kafka发展历史

Kafka设计之初就旨在提供3个方面的功能特性。

  • 为生产者和消费者提供一套简单API。
  • 降低网络传输和磁盘存储开销。
  • 具有高伸缩性架构。

Kafka经历了几次大的版本变迁。每次发布大的版本时都会加入新的功能

Kafka从0.11.0.0版本开始不再支持Scala 2.10,故用户在下载0.11.0.x版本的Kafka时不会再看到kafka_2.10-0.11.0.0字样的下载包。

新版本producer

在Kafka世界中,通常把producer和consumer通称为客户端(即clients),这是与服务器端(即broker)相对应的。

在Kafka 0.9.0.0版本中,社区正式使用Java版本的producer替换了原Scala版本的producer。新版本的producer的主要入口类是org.apache.kafka.clients.producer.KafkaProducer,而非原来的kafka.producer.Producer。

新版本producer重写了之前服务器端代码提供的很多数据结构,摆脱了对服务器端代码库的依赖,同时新版本的producer也不再依赖于ZooKeeper,甚至不需要和ZooKeeper集群进行直接交互,降低了系统的维护成本,也简化了部署producer应用的开销成本

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

上面的代码中比较关键的是KafkaProducer.send方法,它是实现发送逻辑的主要入口方法。新版本producer整体工作流程图如图所示。

新版本的producer大致就是将用户待发送的消息封装成一个ProducerRecord对象,然后使用KafkaProducer.send方法进行发送。实际上,KafkaProducer拿到消息后对其进行序列化,然后结合本地缓存的元数据信息确立目标分区,最后写入内存缓冲区。同时,KafkaProducer中还有一个专门的Sender I/O线程负责将缓冲区中的消息分批次发送给Kafka broker。

比起旧版本的producer,新版本在设计理念上有以下几个特点(或者说是优势)。

  • 发送过程被划分到两个不同的线程:用户主线程和Sender I/O线程,逻辑更容易把控。

  • 完全是异步发送消息,并提供回调机制(callback)用于判断发送成功与否。

  • 分批机制(batching),每个批次中包括多个发送请求,提升整体吞吐量。

  • 更加合理的分区策略:对于没有指定key的消息而言,旧版本producer分区策略是默认在一段时间内将消息发送到固定分区,这容易造成数据倾斜(skewed);新版本采用轮询方式,消息发送将更加均匀化。

  • 底层统一使用基于Java Selector的网络客户端,结合Java的Future实现更加健壮和优雅的生命周期管理。

当然,新版本producer的设计优势还有很多,比如监控指标更加完善等。以上5点只罗列出了最重要的设计特性。

新版本producer的API设计得也足够简单易用,只需要记住几个常用的方法就可以了,如下图所示。


  • send:实现消息发送的主逻辑方法。
  • close:关闭producer。
  • metrics:获取producer的实时监控指标数据,比如发送消息的速率等。

新版本consumer

新版本consumer也是使用Java语言编写的,也不再需要依赖ZooKeeper的帮助。新版本consumer的入口类是org.apache.kafka.clients.consumer.KafkaConsumer。

在旧版本consumer中,消费位移(offset)的保存与管理都是依托于ZooKeeper来完成的。当数据量很大且消费很频繁时,ZooKeeper的读/写性能往往容易成为系统瓶颈。这是旧版本consumer为人诟病的缺陷之一。而在新版本consumer中,位移的管理与保存不再依靠ZooKeeper了,自然这个瓶颈就消失了。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

上面代码中比较关键的是KafkaConsumer.poll方法。它是实现消息消费的主逻辑入口方法。新版本consumer在设计时摒弃了旧版本多线程消费不同分区的思想,采用了类似于Linux epoll的轮询机制,使得consumer只使用一个线程就可以管理连向不同broker的多个Socket,既减少了线程间的开销成本,同时也简化了系统的设计。

比起旧版本consumer,新版本在设计上的突出优势如下。

  • 单线程设计——单个consumer线程可以管理多个分区的消费Socket连接,极大地简化了实现。虽然0.10.1.0版本额外引入了一个后台心跳线程(background heartbeat thread),不过双线程的设计依然比旧版本consumer鱼龙混杂的多线程设计要简单得多。

  • 位移提交与保存交由Kafka来处理——位移不再保存在ZooKeeper中,而是单独保存在Kafka的一个内部topic中,这种设计既避免了ZooKeeper频繁读/写的性能瓶颈,同时也依托Kafka的备份机制天然地实现了位移的高可用管理。

  • 消费者组的集中式管理——上面提到了ZooKeeper要管理位移,其实它还负责管理整个消费者组(consumer group)的成员。这进一步加重了对于ZooKeeper的依赖。新版本consumer改进了这种设计,实现了一个集中式协调者(coordinator)的角色。所有组成员的管理都交由该coordinator负责,因此对于group的管理将更加可控。

比起旧版本而言,新版本在API设计上提供了更加丰富的功能,具体API方法如下图所示。

  • poll:最重要的方法。它是实现读取消息的核心方法。
  • subscribe:订阅方法,指定consumer要消费哪些topic的哪些分区。
  • commitSync/commitAsync:手动提交位移方法。新版本consumer允许用户手动提交位移,并提供了同步/异步两种方式。
  • seek/seekToBeginning/seekToEnd:设置位移方法。除了提交位移,consumer还可以直接消费特定位移处的消息。

和producer不同的是,目前新旧consumer共存于最新版本的Kafka中。虽然社区曾计划投票决定在0.11.0.0这个大版本上正式放弃对于旧版本consumer的支持,但是目前使用旧版本consumer的用户依然不在少数,故即使在最新的1.0.0版本中旧版本依然没有被移除,可以预见这种共存的局面还将维持一段时间。

旧版本功能简介

前面谈到的都是新版本客户端,包括了producer和consumer。新版本客户端必然是社区以后极力推荐使用的,但不可否认的是,旧版本依然有着广泛的用户基础,特别是对于那些早期使用Kafka的公司来说,他们大多数使用的是Kafka 0.8.x这个版本。就拿其中最广泛应用的0.8.2.2这个版本而言,这个版本的Kafka刚刚推出Java版producer,而Java consumer甚至还没有开发。所以,我们还是有必要简要了解一下旧版本客户端,毕竟很多核心设计思想都是一脉相承的。

旧版本producer

这里频频提到的旧版本就是指由Scala语言编写的producer,在比较新的Kafka官网介绍中用户已经找不到对于它的介绍了,但是对于依然使用0.8.x版本Kafka的用户而言,Scala producer依然可能活跃在他们的线上系统中。

Scala producer的入口类是kafka.producer.Producer,一段典型的代码如下:

Properties props = new Properties();
props.put("metadata.broker.list","localhost:9092, localhost:9093,
localhost:9094");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String> (config);
KeyedMessage<String, String> msg = new KeyedMessage<String, String>("my-topic", "hello, world.");
Producer.send(msg);

上面代码中的主要逻辑是由Producer.send方法实现的。该方法默认是同步机制的,即每条消息要等待服务器端发送响应给客户端,明确告知消息发送结果之后,才能开始下一条消息的发送,因此旧版本producer的吞吐量性能是很差的。当然它也提供了一个参数用于实现异步的消息发送逻辑,但是凡事有利就有弊,旧版本producer异步发送会有丢失消息的可能性,对于那些对数据有较强持久化要求的用户来说,异步并不是一个可选项。


新旧版本producer的工作流程原理有很大的相似性,只是新版本producer在各个方面都要优于旧版本。因此社区才会在Kafka 0.9.0.0版本中正式将旧版本“下架”。

对于API而言,旧版本的producer提供的功能也非常有限,如图所示。


由图可知,旧版本只提供了send和close两个方法,另外还提供了一个sync参数用于控制该producer是同步发送消息还是异步发送。因此整套API的设计实际上是非常“简陋”的。

旧版本consumer

不同于producer,旧版本consumer,即Scala consumer,其实并没有那么“旧”,也没有那么弱。如前所述,依然有很多用户在生产环境中使用着旧版本consumer。和新旧版本producer之间有巨大的性能差异不同,新旧版本consumer的性能差异似乎也没有那么大。换句话说,旧版本consumer没有那么“不堪”。这也是社区迟迟没有“下架”它的原因之一。

说起旧版本consumer,就不能不提高阶消费者(high-level consumer)和低阶消费者(low-level consumer)。没错,它们是专属于旧版本而言的。切记新版本是没有high-level和low-level之分的!high-level consumer其实就是指消费者组,而low-level consumer是指单个consumer,即standalone consumer。单个consumer是没有什么消费者组的概念的,每个consumer都单独进行自己的工作,与其他consumer不产生任何关联;而消费者组就是大家作为一个团队一起工作,彼此之间会“相互照应”。

low-level consumer

low-level consumer的底层实现就是SimpleConsumer类。一旦应用此类,Kafka会认为用户有自行管理消费者的需求,从而不会为用户提供任何组管理方面的功能(包括负载均衡和故障转移等),而用户需要自己解决这方面的问题。因此使用SimpleConsumer可以说是既灵活又麻烦。鉴于这些特点,很多需要灵活定制实现的第三方框架往往会采用这种low-level consumer,比如Apache Storm的Kafka插件storm-kafka就使用了SimpleConsumer来实现KafkaSpout。

low-level consumer的API设计得非常简单,具体类图如图2.7所示。


在SimpleConsumer的API设计中还有send方法,难道consumer还需要发送消息吗?其实,这里的send不是指发送消息,而是指发送具体的请求。事实上,尽管旧版本consumer已经不推荐用户使用了,但Kafka服务器底层依然有一部分代码在使用SimpleConsumer负责向其他broker发送特定类型的请求,即使用这里的send方法进行发送,所以读者不要把它和producer的send方法搞混淆了。

high-level consumer

如果说low-level consumer既麻烦又灵活,那么high-level consumer便是既省事又死板。如图2.8所示,high-level consumer主要的方法是createMessageStreams——该方法负责创建一个或多个KafkaStream,用于真正的消息消费。如果是在多台机器上,用户只需要简单地启动多个配置有相同组ID(group.id)的consumer进程;若是在同一台机器上,createMessageStreams方法也允许用户直接指定线程数来创建多consumer实例。不管是哪种方法,这些consumer实例都会自动组成一个消费者组来共同承担消费任务,假如任意时刻有consumer进程或实例宕机,该消费者组都会帮用户自动处理,根本不需要人工干预。如此看来,使用high-level consumer是很省事的,但为什么说它是死板的呢?high-level consumer不似SimpleConsumer那样灵活,可以从分区的任意位置开始消费。它只能从上次保存的位移处开始顺序读取消息,使用起来无法实现高度定制化的消费策略,故而说它是死板的。


如何选择kafka版本

根据功能场景

如果用户要在生产环境中应用流式处理组件Kafka Streams,那么就必须使用Kafka 0.10.0.0(含)之后的版本。若是从零开始搭建或处于技术选型阶段,笔者推荐使用最新版本的Kafka,即1.0.0,毕竟这个版本中修复了很多关于Streams的bug,并且完善了Kafka Streams的各种API接口。
如果要在生产环境中启用Kafka Security,那么至少要使用0.9.0.0及以后的Kafka版本,但最好能使用0.10.0.1之后的版本。
当然,如果将Kafka用于传统的消息引擎服务,甚至是分布式存储之用,那么对于这种需求而言在版本的选择上并没有太多的限制,只需要0.8.x以后的版本就行,因为到0.8版本才加入了集群间的备份机制。没有备份的消息引擎系统不能算是一个完备的解决方案。

根据客户端使用场景

所谓客户端使用场景无非两种:自行研发客户端和第三方框架提供客户端。如果是自行研发客户端,笔者推荐使用新版本的客户端,至少要是0.10.1.0版本的Kafka,因为自这个版本开始新版本consumer才算比较稳定。如果你之前的生产环境部署的是该版本之前的Kafka,那么还是建议使用旧版本consumer为妙。

如果是第三方框架直接提供的客户端,比如Storm或Spark,那么一定要注意这些框架官网上的说明。我们以Storm为例:Storm目前提供了两套Kafka实际数据传输的插件storm-kafka和storm-kafka-client其中,storm-kafka使用了旧版本consumer进行开发,storm-kafka-client则使用了新版本consumer。因此你需要根据使用的具体插件来确定Kafka的版本。比如你的生产环境中部署了Storm且使用了storm-kafka-client中的KafkaSpout实现,那么你的Kafka版本就必须是0.9.0.0或更高的版本。

建议使用比较新的Kafka版本,例如0.10.1.0或更高版本。另外根据客户端的稳定程度,笔者总结了一份详细的主流版本-客户端版本推荐关系表,如表2.2所示。

Kafka与Confluent

Confluent的开源版本与Apache社区的Kafka并无太大的区别,用户甚至完全可以使用Confluent Open Source来替代Apache Kafka,它的下载地址是https://www.confluent.io/download/

下一章 kafa线上环境部署

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 210,978评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,954评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,623评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,324评论 1 282
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,390评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,741评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,892评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,655评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,104评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,451评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,569评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,254评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,834评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,725评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,950评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,260评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,446评论 2 348

推荐阅读更多精彩内容