KAFKA权威指南之深入Kafka服务端

[TOC]

集群成员关系

Kafka使用 Zookeeper来维护集群成员的信息。每个 broker都有一个唯一标识符,这个 标识符可以在配置文件里指定 ,也可以自动生成。在 broker 启动的时候,它通过创建 临时节点把自己的 ID 注册到 Zookeeper。 Kafka 组件订阅 Zookeeper 的/brokers/ids 路径 (broker在 Zookeeper上的注册路径),当有 broker加入集群或退出集群时,这些组件就 可以获得通知。

如果你要启动另一个具有相同 ID的 broker,会得到一个错误一一新 broker会试着进行注 册,但不会成功,因为 Zookeeper里已经有一个具有相同 ID 的 broker。

在 broker停机、出现网络分区或长时间垃圾回收停顿时, broker会从 Zookeeper上断开连 接,此时 broker在启动时创建的临时节点会自动从 Zookeeper上移除。监听 broker列表的 Kafka 组件会被告知该 broker 已移除。

在关闭 broker时,它对应的节点也会消失,不过它的 ID会继续存在于其他数据结构中。 例如,主题的副本列表(下面会介绍)里就可能包含这些白。在完全关闭一个 broker之 后,如果使用相同的 m 启动另一个全新的 broker,它会立即加入集群,井拥有与旧 broker 相同的分区和主题。

控制器

控制器其实就是 一 个 broker,只不过它除了具有一般 broker 的功能之外,还负责分区 首领的选举。 集群里第 一 个启动的 broker 通过 在 Zooke巳per 里创建 一 个临时节点/ cont「olle「让自己成为控制器 。其 他 broker 在启动时也 会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意识”到控制 器节点已存在,也就是说集群里已经有一个控制器了。其他 broker在控制器节点上创建 Zookeeper watch 对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群 里一次只有一个控制器存在。

如果控制器被关闭或者与 Zookeeper 断开连接, Zookeeper 上的临时节点就会消失 。集群里的其他 broker 通过 watch 对象得到控制器节点消失的通知,它们会 尝试让自己成为新的 控制器。第一个在 Zookeeper里成功创建控制器节点的 broker就会成为新的控制器,其他 节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建 watch 对象。每个 新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他 broker在知道当前 cont「olle「 epoch 后,如果收到由控制 器发出的包含较旧 epoch 的消息,就会忽略它们。

当控制器发现一个 broker 已经离开集群(通过观察相关的 Zookeeper路径),它就知道,那 些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个 broker上)。控制器遍 历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下一个副本), 然后向所有包含新首领或现有跟随者的 broker发送请求。该请求消息包含了谁是新首领以 及谁是分区跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,而跟随者 开始从新首领那里复制消息。

当控制器发现一个 broker加入集群时,它会使用 broker B 来检查新加入的 broker是否包 含现有分区的副本。如果有,控制器就把变更通知发送给新加入的 broker和其他 broker, 新 broker上的副本开始从首领那里复制消息。

简而言之, Kafka 使用 Zookeeper 的临时节点来选举控制器, 并在节点加入集群或退出集 群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用 epoch 来避免“脑裂” 。“脑裂”是指两个节点同时认为自己是 当前的控制器。

复制

复制功能是 Kafka架构的核心。在 Kafka的文档里, Kafka把自己描述成“一个分布式的、 可分区的、可复制的提交日志服务”。复制之所以这么关键,是因为它可以在个别节点 失 效时仍能保证 Kafka 的可用性和持久性。

Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副 本被保存在 broker上,每个 broker可以保存成百上千个属于不同主题和分区的副本。

首领副本
每个分区都有一个首领副本 。 为了保证一致性,所有生产者请求和消费者请求都会经过 这个副本。

跟随者副本
首领 以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯 一 的任 务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩渍,其中的一个 跟随者会被提升为新首领。

首领的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。跟随者为了保持与首领的 状态一致、在有新消息到达时尝试从首领那里复制消息,不过有各种原因会导致同步失 败。 例如,网络拥塞导致复制变慢, broker发生崩横导致复制滞后,直到重启broker后复 制才会继续。

为了与首领保持同步,跟随者向首领发送获取数据的请求,这种请求与悄费者为了读取悄 息而发送的请求是一样的。首领将响应消息发给跟随者。请求消息里包含了跟随者想要获 取消息的偏移量,而且这些偏移量总是有序的。

一个跟随者副本先请求淌息 1,接着请求消息 2,然后请求消息 3,在收到这 3 个请求的响 应之前,它是不会发送第 4 个请求消息的。如果跟随者发送了请求消息 4,那么首领就知 道它已经收到了前面 3个请求的响应。 通过查看每个跟随者请求的最新偏移量,首领就会 知道每个 跟随者复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消 息,但在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本无陆 与首领保持一致,在首领发生失效时,它就不可能成为新首领一一毕竟它没有包含全部的 消息。

相反,持续请求得到 的最新悄息副本被称为 同步的副本 。在首领发生失效时,只有同步副 本才有可能被选为新首领。

跟随者的正常不活跃时间或在成为不同步副本之前的时间是通过 「epli.ca.lag.ti.l'le.l'lax.l'ls 参数来配置的。这个 时间间隔直接影响着首领选举期间的客户端行为和数据保留机制。

除了当前首领之外,每个分区都有一个首选首领 创建主题时选定的首领就是分区的首 选首领。之所以把它叫作首选首领,是因为在创建分区时,需要在 broker之间均衡首领。因此 ,我们希望首选首领在成为真正的 首领时, broker间的负载最终会得到均衡。默认情况下, Kafka的 auto.leade「·「ebalance. enable 被设为 t「ue,它会检查首选首领是不是当前首领 , 如果不是,并且该副本是同步 的,那么就会触发首领选举,让首选首领成为当前首领。

处理请求

broker 的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求。 Kafka提 供了一个 二进制协议(基于 TCP),指定了请求消息的格式以及 broker如何对请求作出 响应一一包括成功处理请求或在处理请求过程中遇到错误。客户端发起连接并发送请求, broker 处理请求井作出响应。 broker 按照请求到达的顺序来处理它们一一这种顺序保证让 Kafka具有了消息队列的特性,同时保证保存的消息也是有序的。

所有的请求消息都包含一个标准消息头:

  • Request type (也就是 API key)
  • Request version (broker 可以处理不同版本的客户端请求,井根据客户端版本作出 不同 的响应)
  • Correlation ID- 一个具有唯一性的数字, 用于标识请求消息,同时也会出现在响应消 息和错误日志里(用于诊断问题)
  • Client ID- 用于标识发送请求的客户端

我们不打算在这里描述该协议,因为在 Kafka 文档里已经有很详细的说明。不过,了解 broker如何处理请求还是有必要的一一后面在我们讨论 Kafka监控和各种配置选项时,你 就会了解到那些与队歹lj和线程有关的度量指标和配置参数。

broker会在它所监听的每一个端口上运行一个 Accepto「线程,这个钱程会创建一个连接, 并把它交给 P「ocesso「 线程去处理。 P「ocesso「 线程(也被叫作“网络线程”)的数量是可 配置的。网络线程负责从客户端获取请求悄息,把它们放进请求队列,然后从晌应队列获 取响应消息,把它们发送给客户端。图 5-1 为 Kafka 处理请求的内部流程。

请求消息被放到请求队列后, 10 线程会负责处理它们。下面是几种最常见的请求类型 。

生产请求
生产者发送的请求,它包含客户端要写入 broker的消息。

获取请求
在消费者和跟随者副本需要从 broker读取消息时发送的请求。

image.png

生产请求和获取请求都必须发送给分区的首领副本。如果 broker收到一个针对特定分区的 请求,而该分区 的首领在另 一个 broker上,那么发送请求的客户端会收到 一个“非分区 首领”的错误响应。当针对特定分区的获取请求被发送到一个不含有该分区首领的 broker 上,也会出现同样 的错误。 Kafka 客户端要自己负责把生产请求和获取请求发送到正确的 broker 上。

那么客户端怎么知道该往哪里发送请求呢?客户端使用了另一 种请求类型,也就是元数据 请求。这种请求包含了客户端感兴趣的主题列表。服务器端的响应消息里指明了这些主题 所包含 的分 区、每个分区都有哪些副本, 以及哪个副本是首领。元数据请求可以发送给任 意一个 broker,因为所有 broker都缓存了这些信息。

一般情况下,客户端会把这些信息缓存起来,并直接往目标 broker上发送生产请求和 获取请求。它们需要时不 时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通 过 l'letadata.~刷 .age.Ms 参数来配置),从而知道元数据是否发生了变更一一-比如,在新 broker加入集群时,部分副本会被移动到新的 broker上(如图 5-2 所示)。另外,如果客户 端收到“非首领”错误,它会在尝试重发请求之前先刷新元数据,因为这个错误说明了客 户端正在使用过期的元数据信息,之前的请求被发到了错误的 broker上。

image.png
生产请求

我们在第 3 章讨论如何配置生产者的时候,提到过 acks 这个配置参数一一该参数指定了需 要多少个 broker确认才可以认为一个消息写入是成功的。不同的配置对“写入成功”的界 定是不一样的,如果 acks=1,那么只要首领收到消息就认为写入成功;如果 acks=all,那 么需要所有同步副本收到消息才算写入成功;如果 acks吨,那么生产者在把消息发出去之 后,完全不需要等待 broker 的响应。

包含首领副本的 broker在收到生产请求时,会对请求做一些验证。

  • 发送数据的用户是否有主题写入权限?
  • 请求里包含的 acks 值是否有效(只允许出现。、 1 或 all) ?
  • 如果 acks=all, 是否有足够多的罔步副本保证消息已经被安全写入? (我们可以对 broker进行配置,如果同步副本的数量不足, broker可以拒绝处理新消息)

之后,悄息被写入本地磁盘。在 Linux 系统上,消息会被写到文件系统缓存里,并不保证 它们何时会被刷新到磁盘上。 Kafl<a不会一直等待数据被写到磁盘上一一它依赖复制功能 来保证消息的持久性。

在消息被写入分区 的首领之后, broker开始检查 acks 配置参数一一如果 acks 被设为 0或 1, 那么 broker立即返回响应;如果 acks 被设为 all,那么请求会被保存在一个叫作炼狱的缓冲 区里,直到首领发现所有跟随者副本都复制了消息,晌应才会被返回给客户端。

获取请求
broker处理获取请求的方式与处理生产请求的方式很相似。客户端发送请求,向 broker请 求主题分区里具有特定偏移量的消息,好像在说 : “请把主题 Test 分区 0 偏移量从 53 开始 的消息以及主题 Test分区 3偏移量从 64开始的消息发给我。”客户端还可以指定 broker最 多可以从一个分区里返回多少数据。这个限制是非常重要的,因为客户端需要为 broker返 回的数据分配足够的内存。如果没有这个限制, broker返回的大量数据有可能艳尽客 户端 的内存。

我们之前讨论过,请求需要先到达指定的分区首领上,然后客户端通过查询元数据来确保 请求的路由是正确的。首领在收到请求时,它会先检查请求是否有效一一比如,指定的偏 移量在分区上是否存在?如果客户端请求的是已经被删除的数据,或者请求的偏移量不 存 在,那么 broker将返回一个错误。

如果请求的偏移量存在, broker 将按照客户端指定的数量上限从分区里读取消息,再把消 息返回给客户端。 Kafka使用零复制技术向客户端发送消息一一也就是说, Kafka直接把消 息从文件(或者更确切地说是 Linux 文件系统缓存)里发送到网络通道,而不需要经过任 何中间缓冲区。这是 Kafka 与其他大部分数据库系统不一样的地方,其他数据库在将数据 发送给客户端之前会先把它们保存在本地缓存里。这项技术避免了字节复制,也不需要管 理内存缓冲区,从而获得更好的性能。

客户端除 了可以设置 broker 返回数据的上限,也可以设置下限。例如,如果把下限设置为 10阻,就好像是在告诉 broker:“等到有 lOKB 数据的时候再把它们发送给我。”在主题消 息流量不是很大的情况下,这样可以减少 CPU 和网络开销。客户端发送一个请求, broker 等到有 足够的数据时才把它们返回给客户端,然后客户端再发出请求,而不是让客户端每 隔几毫秒就发送一次请求,每次只能得到很少的数据甚至没有数据。(女口图 5-3 所示。)对 比这两种情况,它们最终读取的数据总量是一样的,但前者的来回传送次数更少,因此开 销也更小。

image.png

当然,我们不会让客户端一直等待 broker累积数据。在等待了一段时间之后,就可以把可用的数据拿 回处理,而不是一直等待下去。所以,客户端可以定义一个超时时间,告诉 broker:“如果你无告在 X 毫秒内累积满足要求的数据量,那么就把当前这些数据返回 给我。,,

有意思 的是,并不是所有保存在分区首领上的数据都可以被客户端读取。大部分客户端只 能读取 已经被写入所有同步副本的悄息(跟随者副本也不行,尽管它们也是消费者 否 则复制功能就无陆工作)。分区首领知道每 个消息会被复制到哪个副本上,在消息还没有 被写入所有同步副本之前,是不会发送给消费者的一一尝试获取这些消息的请求会得到空 的响应而不是错误 。

因为还没有被足够多副本复制的消息被认为是“不安全”的一一如果首领发生崩愤,另 一 个副本成为新首领 ,那么这些消息就丢失了。如果我们允许消费者读取这些消息,可能就 会破坏一致性。试想, 一个悄费者读取并处理了这样的一个消息,而另一个消费者发现这 个消息其实并不存在。 所以,我们会等到所有同步副本复制了这些消息,才允许消费者读 取它们 (女日图 5-4 所示) 。 这也意味着,如果 brok巳r 间的消息复制因为某些原因变慢,那 么消息 到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可 以通过参数来配置,它指定了副本在复制消息时可被允许的最大 延迟时间。

image.png

其他请求

到此为止,我们讨论了 Kafka最为常见的几种请求类型:元数据请求、生产请求和获取 请求。重要的是,我们讨论的是客户揣在网络上使用的通用 二进制协议。 Kafka 内置了由 开源社区贡献者实现和维护的 Java 客户端,同时也有用其他语言实现的客户端,如 C、 Python、 Go语言等。 Kafka网站上有它们的完整洁单,这些客户端就是使用这个二进制协 议与 broker通信的。

另外, broker之间也使用同样的通信协议。它们之间的请求发生在 Kafka内部,客户端不 应该使用这些请求。例如,当一个新首领被选举出来,控制器会发送 Leade「AndIs「请求给 新首领(这样它就可以开始接收来自客户端的请求)和跟随者(这样它们就知道要开始跟 随新首领)。

Kafka 协议可以处理 20 种不同类型的请求,而且会有更多的类 型加入进来。协议在持续慎化一一随着客户端功能的不断增加,我们需要改进协议来满足 需求。例如,之前的 Kafka 消费者使用 Zookeeper 来跟踪偏移量,在消费者启动的时候, 它通过检查保存在 Zooke巳per上的偏移量就可以知道从哪里开始处理悄息。因为各种原 因,我们决定不再使用 Zookeeper来保存偏移量,而是把偏移量保存在特定的 Kafka主题 上。为了达到这个目的,我们不得不往协议里增加几种请求类型: OffsetCol'l~ OffsetFetchReqL』est 和 Li.stOffsetsReqL』est。现在,在应用程序调用 COl'll'li.tOffset() 方撞 时,客户端不再把偏移量写入 Zookeep巳r,而是往 Kafka发送 OffsetCol'll'li.tRequest请求。

主题的创建仍然需要通过命令行工具来完成,命令行工具会直接更新 Zookeeper里的主题 列表, broker监听这些主题列表,在有新主题加入时,它们会收到通知。我们正在改进 Kafka,增加了 E「eateTopi.cRequest 请求类型,这样客户端(包括那些不支持 Zookeeper客 户端的编程语言)就可以直接向 brok巳r请求创建新主题了。

除了往协议里增加新的请求类型外,我们也会通过修改已有的 请求类型来给它们增加新功 能。例如,从 Kafka O旦0到 Kafka 0.10.0,我们希望能够让客户端知道谁是当前的控制器, 于是把控制器信息添加到元数据响应消息里。我们还在元数据请求消息和响应消息里添加 了 一个新的 version 字段。现在, 0.9.0版本的客户端发送的元数据请求里 version 为 0 (0.9.0版 本客户端的 version 不会是 1)。不管是 0.9.0版本的 broker,还是 0.10.0版本的 broker,它们 都知道应该返回 version为 0的响应, 也就是不包含控制器信息的响应。

0.9.0版本的客户端 不需要控制器的信息,而且也没必要知道如何去解析它。 0.10.0版本的客户端会发送 version 为 1 的元数据请求, 010.0版本的 broker会返回 version为 1 的响应,里面包含了控制器的 信息。如果 0.10.0版本的客户端发送 version为 l 的请求给 0.9.0版本的 broker,这个版本的 broker不知道该如何处理这个请求 ,就会返回一个错误。这就是为什么我们建议在升级客户 端之前先升级 broker,因为新的 broker知道如何处理旧的请求,反过来则不然。

我们在 O.lO.O版本的 Kafka里加入了 Api.Versi.onRequest-一客户端可以询问 broker支持哪 些版本的请求,然后使用正确的版本与 broker通信。如果能够正确使用这个新功能,客户 端就可以与旧版本的 broker通信,只要 broker支持这个版本的协议。

物理存储

Kafka 的基本存储单元是分区。分区无住在多 个 broker 间进行再细分,也无蓓在同 一 个 broker 的多个磁盘上进行再细分。 所以,分区的大小受到单个挂载点可用空间的限制( 一 个挂载点由单个磁盘或多个磁盘组成 ,如果配置了 JBOD,就是单个磁盘,如果配置了 RAID,就是多个磁盘)

在配置 Kafka的时候,管理员指定了一个用于存储分区的目录清单一一也就是 log.di.「5参 数的值(不要把它与存放错误日志的目录说淆了,日志目录是配置在 log4j.properties文件 里的)。该参数一般会包含每个挂载点 的目录。

接下来我们会介绍 Kafka是如何使用这些目录来存储数据的。首先,我们要知道数据是如 何被分配到集群的 broker上以及 broker 的目录里的。然后,我们还要知道 broker是如何管 理这些文件的,特别是如何进行数据保留 的。随后,我们会深入探讨文件和索引格式。最 后,我们会讨论日志压缩及其工作原理。日志压缩是 K挝ka 的一个高级特性,因为有了这 个特性, Kafka 可以用来长时间 地保存数据。

分区分配
在创建主题时, Kafka首先会决定如何在 broker间分配分区。假设你有 6个 broker,打算 创建一个包含 10个分区的主题,并且复制系数为 3。那么 Kafka就会有 30个分区副本, 它们可以被分配给 6个 broker。在进行分区分配时,我们要达到如下的目标。

  • 在 broker 间平均地分布分区副本。对于我们的例子来说,就是要保证每个 broker可以 分到 5个副本。
  • 确保每个分区的每个副本分布在不同的 broker上。假设分区 0的首领副本在 broker2上, 那么可以把跟随者副本放在 brok巳r 3 和 broker 4 上,但不能放在 broker 2 上,也不能两 个都放在 broker3上。
  • 如果为 broker指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的 broker上。这样做是为了保证一个机架的不可用不会导致整体的分区不可用。

为了实现这个目标,我们先随机选择一个 broker (假设是 4),然后使用轮询的方式给每 个 broker分配分区来确定首领分区的位置。于是,首领分区 0 会在 broker 4 上,首领分区 l 会在 broker 5 上,首领分区 2 会在 broker O 上(只有 6 个 broker),并以此类推。然后,
我们从分区首领开始,依次分配跟随者副本。如果分区 0 的首领在 broker 4 上,那么它的 第一个跟随者副本会在 broker 5 上,第二个跟随者副本会在 broker O上。分区 l 的首领在 broker5上,那么它的第一个跟随者副本在 brokerO上,第二个跟随者副本在 brokerl上。

如果配置了机架信息,那么就不是按照数字顺序来选择 broker 了,而是按照交替机架的方式 来选择 broker。假设 brokerO、 broker l和 broker2放置在同一个机架上, broker3、 broker4 和 broker 5分别放置在其他不同的中几架上。我们不是按照从 0到 5的顺序来选择 broker,而 是按照 0, 3, 1, 4, 2, 5 的顺序来选择,这样每个相邻的 broker都在不同的机架上(如 图 5-5所示)。于是,如果分区 0的首领在 broker4上,那么第一个跟随者副本会在 broker2上, 这两个 broker在不同的机架上。如果第一个机架下线,还有其他副本仍然活跃着,所以分区 仍然可用。这对所有副本来说都是一样的,因此在机架下线时仍然能够保证可用性。

image.png

为分区和1副本选好合适的 broker之后,接下来要决定这些分区应该使用哪个目录。我们单 独为每个分区分配目录,规则很简单 : 计算每个目录里的分区数量,新的分区总是被添加 到数量最小的那个目录里。也就是说,如果添加了 一个新磁盘,所有新的分区都会被创建 到这个磁盘上。因为在完成分配工作之前,新磁盘的分区数量总是最少的 。

文件管理
保留数据是 Kafka 的一个基本特性, Kafka 不会一直保留数据,也不会等到所有消费者都 读取了消息之后才删除消息。相反, Kafka 管理员为每个主题配置了数据保留期限,规定 数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。

因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若 干个片段 。 默认情况下,每个片段包含 lGB 或一周的数据,以较小的那个为准。在 broker 往分区写入数据时,如果达到片段上限,就关闭当前文件,井打开一个新文件。

当前正在写入数据的片段叫作活跃片段。活动片段永远不会被删除,所以如果你要保留数 据 l 天 ,但片段里包含了 5 天的数据,那么这些数据会被保留 5 天,因为在片段被关闭之 前这些数据无桂被删除 。 如果你要保留数据一周,而且每天使用 一个新片段,那么你就会 看到,每天在使用一个新片段的同时会删除一个最老的片段一一-所以大部分时间该分区会 有 7个片段存在。

文件格式
我们把 Kafka 的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过 来或者发送给消费者的悄息格式是一样的。因为使用了相同的消息格式进行磁盘存储和网 络传输 , Kafka 可以使用 零复 制技术给消费者发送消息,同时避免了对生产者已经压缩过 的消息进行解压和再压缩。

除了键、值和偏移量外, 1肖息里还包含了消息大小、校验和、消息格式版本号、压缩算能 (Snappy、 GZip 或 LZ4)和时间戳(在 0.10.0 版本里引入的)。时间戳可以是生产者发送消息的时间,也可以是消息到达 broker的时间,这个是可配置的。

如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一起,被当作“包装消息”进行发送(如图 5-6所示)。于是 , broker就会收到一个这样的消息,然后再把它 发应给消费者。消费者在解压这个消息之后,会看到整个批次的消息,它们都有自己的时 间戳和偏移量。

也就是说,如果在生产者端使用了压缩功能(极力推荐),那么发送的批次越大,就意味着在网络传输和磁盘存储方面会获得越好的压缩性能,同时意味着如果修改了消费者使用 的消息格式(例如,在消息里增加了时间戳),那么网络传输和磁盘存储的格式也要随之修改,而且 broker要知道如何处理包含了两种消息格式的文件。

索引
消费者可以从 Kafka 的任意可用偏移量位置开始读取消息。假设消费者要读取从偏移量 100 开始的 lMB 消息,那么 broker必须立即定位到偏移量 100 (可能是在分区的任意一个片段 里),然后开始从这个位置读取消息。为了帮助 broker更快地定位到指定的偏移量, Kafka 为每个分区维护了 一个索引。索引把偏移量映射到片段文件和偏移量在文件里的位置。

索引也被分成片段,所以在删除消息时,也可以删除相应的索引 。 Kafka 不维护索引的 校验和。如果索引出现损坏, Kafka 会通过重新读取消息并录制偏移量和位置来重新 生 成索引。如果有必要,管理员可以删除索引,这样做是绝对安全的, Kafka 会自动 重新 生成这些索引。

清理
一般情况下, Kafka 会根据设置的时间保留数据,把超过时效的旧数据删除掉 。不过,试 想一下这样的场景,如果你使用 Kafka保存客户的收货地址,那么保存客户的最新地址比 保存客户上周甚至去年的地址要有意义得多,这样你就不用担心会用错旧地址,而且短时 间内客户也不会修改新地址。另外一个场景, 一个应用程序使用 Kafka保存它的状态, 每 次状态发生变化,它就把状态写入 Kafka。在应用程序从崩愤中恢复时,它从 Kafka读取 消息来恢复最近的状态。在这种情况下,应用程序只关心它在崩愤前的那个状态,而不关 心运行过程中的那些状态。

Kafka 通过改变主题的保留策略来满足这些使用场景。早于保留时间的旧事件会被删除 , 为每个键保留最新的值,从而达到清理的效果。很显然,只有当应用程序生成的事件 里 包含了键值对时,为这些主题设置 col'lpact策略才有意义。如果主题包含 null键, 清理 就会失败。

清理的工作原理
每个日志片段可以分为以下两个部分。

干净的部分
这些消息之前被清理过,每个键只有一个对应的值,这个值是上一次清理时保留下来的。
污浊的部分
这些消息是在上一次清理之后写入的。

image.png

如果在 Kafka 启动时启用了清理功能(通过配置 log.cleane「.enabled 参数),每个 broker 会启动一个清理管理器线程和多个清理线程,它们负责执行清理任务。这些线程会选择污 浊率(污浊消息占分区总大小的比例)较高的分区进行清理。

为了清理分区 ,情理线程会读取分区的污浊部分,井在内存里创建 一 个 map。 map 里的 每个元素包含了消息键的散列值和消息的偏移量,键的散列值是 16B,加上偏移量总共是 24B。 如果要清理一个 lGB 的日志片段,并假设每个消息大小为 1阻,那么这个片段就包 含一百万 个悄息,而我们只需要用 24MB 的 map 就可以清理这个片段。(如果有重复的键, 可以重 用散列项,从而使用更少的内存。)这是非常高效的!

管理员在配置 Kafka时可以对 map使用的内存大小进行配置。每个线程都有自己的 map, 而这个参数指的是所有线程可使用的内存总大小。如果你为 map 分配了 lGB 内存,并使 用了 5 个清理线程,那么每个钱程可以使用 200MB 内存来创建自己的 map。 Kafka 井不要 求分区的整个污烛部分来适应这个 map的大小,但要求至少有一个完整的片段必须符合。

如果不符合,那么 Kafka就会报错,管理员要么分配更多 的内存,要么减少清理线程数量。如果只有少部分片段可以完全符合, Kafka将从最旧的片段开始清理,等待下一次清 理剩余 的部分 。

清理线程在创建好偏移盐 map后,开始从干净的片段处读取消息,从最旧的消息开始,把 它们的 内容与 map 里的内容进行比对。它会检查消息的键是否存在于 map 中,如果不存在, 那么说 明消息的值是最新的,就把消息复制到替换片段上 。 如果键已存在,消息会被忽略, 因为在分区 的后部已经有一个具有相同键的消息存在。在复制完所有的消息之后,我们就将 替换片段与原始片段进行交换,然后开始清理下一个片段。完成整个清理过程之后,每个键 对应一个不同的消息一寸主些消息的值都是最新的。清理前后的分区片段如图 5-8所示。

image.png

被删除的事件
如果只为每个键保留最近的一个消息,那么当需要删除某个特定键所对应的所有消息时 , 我们该怎么办?这种情况是有可能发生的,比如一个用户不再使用我们的服务,那么 完全 可以把与这个用户相关的所有信息从系统中删除。

为了彻底把一个键从系统里删除,应用程序必须发送一个包含该键且值为 null 的消息 。清 理线程发现该悄息时,会先进行常规的清理,只保留值为 null 的消息。该悄息(被称为墓 碑消息)会被保留一段时间,时间长短是可配置的。在这期间,消费者可以看到这个基碑 悄息,井且发现它的值已经被删除。于是,如果消费者往数据库里复制 Kafka 的数据, 当 它看到这个墓碑消息时,就知道应该要把相关的用户信息从数据库里删除 。在这个时间 段 过后,清理线程会移除这个墓碑消息,这个键也将从 Kafka 分区里消失 。重要的是,要 留 给消费者足够多的时间,让他看到墓碑消息,因为如果消费者离线几个小时并错过了 墓碑 消息,就看不到这个键,也就不知道它已经从 Kafka里删除,从而也就不会去删除数据库 里的相关数据了。

何时会清理主题
就像 delete 策略不会删除当前活跃的片段 一样, coMpact 策略也不会对当前片段进行清理 。只有旧片段里的消息才会被清理。

在 0.10.0和更早的版本里, Kafka会在包含脏记录的主题数量达到 50%时进行清理。 这样 做的目的是避免太过频繁的清理(因为清理会影响主题的读写性能),同时也避免存在太 多脏记录(因为它们会占用磁盘空间)。液费 50% 的磁盘空间给主题存放脏记录,然后进 行一 次清理,这是个合理的折中,管理员也可以对它进行调整。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容