Kafka 中存在大量的延时操作,如:延时生产、延时拉取、延时删除等。Kafka 没有直接使用 JDK 里面实现的 Timer 或 DelayQueue 来实现延时功能。众所周知,Timer 和 DelayQueue 的任务添加/删除是采用了优先队列(小顶堆)算法实现的,这个算法的平均时间复杂度是 O(nlogn)。这个性能并不能满足 Kakfa 要求。因此,Kafka 使用时间轮实现了自己的延时定时器(SystemTimer),添加/删除操作的平均时间复杂度均为 O(1)。
1 单层时间轮
上图就是一个很经典的时间轮的设计,里面包含了实现时间轮最重要的部分:
-
表盘
-
基本时间跨度(tickMs)
每个时间格有自己的时间跨度
时间轮大小(wheelSize)
-
总体时间跨度(interval)
interval = tickMs × wheelSize
-
表盘指针(currentTime)
时间轮使用指针区分到期时间格(包含指针指向的部分)和未到期时间格。currentTime 是 tickMs 的整数倍。被指针指到的时间格的 TimerTaskList 全部任务都需要被处理。
-
时间格
如上图编码是9的时间格,那么这个时间格的对应的时间区间是 [startMs + 9, startMs + 10)。startMs 是指创建时间轮的createTime。
-
-
双向循环链表
我们真正需要处理的延时任务封装成一个个 TimerTaskEntry 构成一个双向循环链表。
好处:
- 这种设计将计时与处理任务分开,任务的处理不影响计时任务
- 任务列表 TimerTaskList 与时间格关联后,突然有属于这个时间格的任务可以直接加入链表,时间复杂度是O(1)
用上图的时间轮为例子,假设当前表盘指针指向 0,此时有一个延时2ms的 A 任务插入,那么 A 将会被添加到 2 的 TimerTaskList,随时间推移,指针来到了2,就会开启线程异步处理 2 的 TimerTaskList;假设这时又有一个 8ms 和 19ms 的任务插入,那么8ms的会被添加到 10 的 TimerTaskList,而19ms的会复用已经过期的 1 的 TimerTaskList。
2 多层时间轮
如果此时有一个 350ms 的任务需要插入呢?直接扩充 wheelSize?要知道 Kafka 中延时任务既有几万毫秒的定时任务,也有几十万毫秒的定时任务,盲目扩充 wheelSize 会造成空间浪费,而且过大的时间轮也会影响效率。因此,Kafka 引入了层级时间轮的设计,如下图
350ms的任务已经超过了第一层时间轮的 interval,而第二层的 interval 是 400ms,所以这个任务应该放在第二层时间轮。又因为 350 ÷ 20 = 17.5,所以该任务应该在第二层时间轮的 17 时间格的 TimerTaskList 中。当第二层时间轮的指针指向 17 时,才 340ms,还有 10ms。这时,我们会将任务降级到第一层的时间轮,假设当前第一层时间轮的指针指向2,那么该任务会添加到第一层时间轮的 12 时间格的 TimerTaskList。
注意:
1. 多层时间轮中,第一层的startMs仍然是系统创建时间,而其他层级的时间轮的startMs则是上一层的currentTime
2. 当前层级的currentTime一定是自己的tickMs的整数倍,如果不满足的话,则要进行修改 currentTime = 当前时间 - (当前时间 % tickMs)
3. 通过 overflowWheel 指向下一层级的时间轮
3 时间轮时间推移的实现
到这里,我们知道了时间轮是如何做到添加/删除任务的平均时间复杂度为O(1)的,但是它的计时(或者说时钟/时间推移)是怎么实现的呢?
Kafka 是直接使用了 JDK 的 DelayQueue 实现计时。因为 DalayQueue 计时是可以的,但是完全依赖它来实现延时任务,在添加/操作这块性能就不能达标,所以 Kafka 使用了 DalayQueue 计时,使用了 TimerTaskList 来完成任务的添加/删除。
4 延时操作
时间轮的具体应用
Kafka 中有许多的延时操作,比如当我们 ack 设置成 -1 时,Kafka 会等所有 ISR 都回复 ack 或者超时才会给客户端回复结果。那么 leader 是怎么知道消息是否在允许时间内完成消息同步的呢?又怎么知道同步是否超时的呢?请看下图
当我们有消息写入 leader 时,Kafka 就会创建一个延时操作,这个延时操作实现了“指定时间内,follower 没完成消息同步,告诉客户端超时”。
值得一提的是 延时操作 ≠ 定时任务,因为延时操作不一定需要到时间才能完成,可以提前完成。当消息同步没有超时完成时,正常返回就是提前完成的情况,如下图
这里就是外部事件触发完成延时操作的。当有消息写入 leader 之后,由于 ack=-1。因此,ISR 的 follower 会立马进行消息同步,每个 follower 同步好之后会告诉 leader。当所有 ISR 中的 follower 均回复 ack 时,leader 就会将 HW 改成 4 之后,并回复客户端结果。