《时间轮在 Netty , Kafka 中的设计与实现(上)》
4. 单层时间轮的设计缺陷
Netty 采用的是单层时间轮的设计,时钟间隔 tickDuration 为 100ms , 一共 512 个刻度。
workerThread 每隔 100ms 将时钟 tick 向前推进一格,并执行对应 bucket 中的延时任务。但如果长时间没有任务过期的话,那么时钟 tick 就会一直空转,造成不必要的性能损耗。
比如,我们在 tick = 0
这个时刻向时间轮分别添加延时 100ms , 延时 51100ms 的定时任务,它们会被添加到 HashedWheelBucket1, HashedWheelBucket511 中,剩下的 HashedWheelBucket 全部都是空的。
当 100ms 的延时任务被执行完之后,时钟 tick 将不得不一直空转,直到空转到 511 这个刻度,才会去执行 51100ms 的延时任务。当这两个延时任务被执行完之后,如果不在向时间轮添加新的任务,那么时间轮将会一直空转下去。
除了会有时钟 tick 空转的现象之外,单层时间轮还无法高效应对延时时间跨度比较大的定时任务。比如,定时任务的延时时间横跨多个时钟周期。Netty 时间轮的时钟周期为 51200ms ,假设我们在 tick = 1
这个时刻向时间轮添加三个延时任务,它们的延时时间分别为:51200ms , 2 * 51200ms , 3 * 51200ms 。
这三个延时任务都会被添加到 HashedWheelBucket1 中,但它们的 remainingRounds
都是大于 0 的。
private static final class HashedWheelTimeout {
// 执行该延时任务需要经过多少时钟周期
long remainingRounds;
}
虽然在当前时钟周期内,HashedWheelBucket1 中没有任何可以执行的延时任务,但 workerThread 仍然需要将 HashedWheelBucket1 中的延时任务全部遍历一遍。如果像这样的延时任务非常多,那么 workerThread 遍历 HashedWheelBucket 的操作也会产生不必要的性能开销。
综上所述,单层时间轮的设计缺陷总结下来主要有两个方面:
在长时间没有到期任务的情况下,单层时间轮会有时钟 tick 空转的现象。
面对海量延时时间跨度比较大的定时任务场景,除了时钟 tick 空转之外,还会产生不必要的 HashedWheelBucket 遍历开销。
而 Kafka 中的多层时间轮设计则完美地解决了上面的这两个缺陷,那怎么解决的呢 ? 下面就让我们带着这两个疑问来深入探究一下 Kafka 多层时间轮的设计与实现。
5. Kafka 多层时间轮的设计
如下图所示,Kafka 时间轮的设计模型可以说是和 Netty 时间轮的设计非常相似。
时间轮的表盘都是一个环形数组,Kafka 中的刻度为 20 ,Netty 中的刻度为 512 。
// Kafka
public class TimingWheel {
private final TimerTaskList[] buckets;
}
// Netty
public class HashedWheelTimer implements Timer {
private final HashedWheelBucket[] wheel;
}
Netty 使用的是 HashedWheelBucket 结构来描述时间轮中的刻度,它是一个带有头尾指针的双向链表。
private static final class HashedWheelBucket {
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
}
Kafka 则使用 TimerTaskList 结构来描述时间轮中的刻度,它是一个带有头结点的双向循环链表。
class TimerTaskList implements Delayed {
// 头结点
private final TimerTaskEntry root;
TimerTaskList(
AtomicInteger taskCounter,
Time time
) {
this.root = new TimerTaskEntry(null, -1L);
this.root.next = root;
this.root.prev = root;
}
}
时间轮的延时任务在 Netty 中是用 HashedWheelTimeout 结构来表示。
private static final class HashedWheelTimeout implements Timeout, Runnable {
private final TimerTask task;
private final long deadline;
HashedWheelTimeout next;
HashedWheelTimeout prev;
}
而在 Kafka 中则是用 TimerTaskEntry 结构来表示,可以看出两种结构是非常相似的,只不过是名字变了一下而已。
public class TimerTaskEntry {
public final TimerTask timerTask;
public final long expirationMs;
TimerTaskEntry next;
TimerTaskEntry prev;
}
Netty 时间轮中的时钟间隔是用 tickDuration 来表示,默认为 100ms 。Kafka 中的时钟间隔是用 tickMs 来表示,默认为 1ms 。
// Netty
private final long tickDuration;
// Kafka
private final long tickMs;
Netty 中的时钟周期是 51200ms , Kafka 中的时钟周期是 20ms 。Netty 时间轮的转动是一个 tick 一个 tick 地向前转动,时间每经过一个 tickDuration,时钟 tick++ 。tick 是一个绝对值,从 0 开始递增,只增不减。
// Netty
private long tick;
// Kafka
private long currentTimeMs;
而 Kafka 时钟是由 currentTimeMs 来表示的,与 Netty 不同的是,currentTimeMs 是系统当前时间的一个时间戳,单位为毫秒。时间每经过 tickMs,currentTimeMs 就增加 tickMs。Kafka 时间轮的转动是通过更新 currentTimeMs 时间戳来驱动的。
以上就是 Kafka 与 Netty 在时间轮的设计上比较相似的地方,那么不同的地方在哪里呢 ?
我们都知道 Kafka 采用的是多层时间轮的设计,主要是为了应对海量延时时间跨度比较大的定时任务场景。 Kafka 中的时间轮是按需添加的,在初始的时候只有一个时间轮(第一层),如下图所示:
第一层时间轮的精度,也就是时钟间隔 tickMs 为 1ms , 一共 20 个刻度,时钟周期 interval 为 20ms 。和 Netty 一样,时间轮中每个 bucket (TimerTaskList) 负责存放到期时间在相同范围内的延时任务 TimerTaskEntry。
比如我们以时间轮的初始状态为例, Kafka 第一层时间轮的环形数组 buckets[0] 中,存放的是到期时间在[ 0 , 1 ) ms
内的延时任务,buckets[1] 中存放的是到期时间在 [ 1 , 2 ) ms
内的延时任务,buckets[19] 中存放的是到期时间在 [ 19 , 20) ms
内的延时任务。
假设现在我们要向时间轮中添加一个延时 28ms 执行的定时任务。很明显,它已经超过了第一层时间轮的时钟周期 —— 20ms,所以我们应该将这个定时任务添加到第二层时间轮中。
前面我们刚刚提过,Kafka 的多层时间轮是按需创建的,初始的时候只有一层时间轮,这时我们就需要创建第二层时间轮,每一层时间轮 TimingWheel 中有一个字段 overflowWheel 用于指向其上层时间轮。
public class TimingWheel {
// 上层时间轮
private volatile TimingWheel overflowWheel = null;
}
第二层时间轮的时钟间隔 tickMs 就是第一层时间轮的时钟周期 interval 为 20ms , 时钟刻度仍然是 20 , 所以第二层时间轮的时钟周期 interval 为 400ms。这个延时 28ms 执行的定时任务将会添加到第二层时间轮的 buckets[1] 中。
假设现在我们又向时间轮中添加一个延时 450ms 执行的定时任务,然而第二层时间轮的时钟周期却是 400ms , 无法满足这个延时时间,所以我们就需要创建第三层时间轮。将这个延时 450ms 的定时任务添加到第三层时间轮中。
同样的道理,第三层时间轮的时钟间隔 tickMs 又是第二层时间轮的时钟周期 interval 为 400ms , 时钟刻度仍然是 20 , 所以第三层时间轮的时钟周期 interval 为 8000ms。这个延时 450ms 执行的定时任务将会添加到第三层时间轮的 buckets[1] 中。
Kafka 并没有限制时间轮的层数,每当最高层时间轮的时钟周期 interval 无法满足延时任务的延时时间 delayMs 时,Kafka 就会按需创建一个更高层的时间轮,该时间轮的时钟间隔 tickMs 就是其下一层时间轮的时钟周期 interval。
Kafka 的多层时间轮设计与我们日常生活中的钟表非常的相似,笔者以三层时间轮进行类比说明:
第一层时间轮可以看做是只有秒针的一个表盘,秒表的时钟间隔 tickMs 就是 1s , 时钟周期 interval 是 60s。
第二层时间轮可以看做是只有分针的一个表盘,时钟间隔 tickMs 是 60s , 正好是秒表的时钟周期。分表的时钟周期 interval 是 60m 。
第三层时间轮可以看做是只有时针的一个表盘,时钟间隔 tickMs 是 60m,正好是分表的时钟周期。时表的时钟周期 interval 是 12h 。
秒针每走一个刻度就是一秒,秒针走完一个时钟周期,分针走一个刻度就是一分钟,当分针走完一个时钟周期,时针走一个刻度就是一个小时。
同样的道理, Kafka 第一层时间轮走一个刻度,就是 1ms , 走完一个时钟周期就是 20ms 。随后第二层时间轮走一个刻度,当第二层时间轮走完一个时钟周期(400ms) 之后,第三层时间轮走一个刻度。第三层时间轮走完一个时钟周期就是 8000ms 。
但是这样的设计只是解决了延时任务跨度比较大的问题,却无法解决 Netty 时间轮中存在的空推进现象。假设,我们现在一共有三层时间轮,但是却没有一个延时任务。这种情况下,每一层时间轮也还是会一下一下的向前推进,但是却执行不到任何延时任务。产生了更加严重的空推进问题。
那如何解决呢 ? 我们是不是应该换一种思路去想,如果时间轮中没有任何延时任务我们就不推进,仿佛时间静止了一样,因为在这种情况下继续推进时间轮没有任何意义。
如果时间轮中有延时任务,那我们也不会一个刻度一个刻度地推进,而是等到延时任务到期之后,才去推进时间轮。这样一来,空推进的问题不就解决了吗 ?
设计思路确定了,那么如何实现呢 ? 核心问题就是有没有一种数据结构,可以将延时任务按照过期时间由近及远地组织起来,如果没有任何任务过期,时间轮就阻塞在这个数据结构上,如果有任务过期,时间轮就从这个数据结构上唤醒,然后执行延时任务,并将时间轮的 currentTimeMs 直接推进到任务的过期时间点。
这种数据结构不就是笔者在本文 1.2 小节中介绍的 DelayQueue 吗 ?但是如果我们用 DelayQueue 来组织时间轮中的所有延时任务的话,那么又会遇到同样的问题,就是面对海量延时任务的添加,取消时,DelayQueue 的时间复杂度太高 —— O(logn)
。
我们可以进一步折中设计一下,仔细观察一下时间轮的设计特点,每一个 bucket (TimerTaskList) 存放的是过期时间在相同范围内的任务。
我们还是以时间轮的初始状态为例进行说明,每个 bucket 都有一个固定的过期时间范围 —— [ startTime , endTime ) , 延时任务的过期时间只要在这个范围之内,那么他们都会被组织在同一个 bucket 中。
延时任务有过期时间的概念,而 bucket 是用来组织延时任务的,那么我们反过来思考一下,bucket 自然也就具备了过期时间的概念。在 Kafka 中,bucket 的过期时间就是对应的 startTime。
class TimerTaskList implements Delayed {
// 过期时间
private final AtomicLong expiration;
}
当时间到达 startTime 这个时刻之后,Kafka 就会将对应 bucket 中的延时任务挨个拿出来进行处理,这就是多层时间轮整体的一个运转思路。
既然 bucket 有过期时间的概念,那么我们何不用 DelayQueue 只保存 bucket ,而时间轮中 bucket 的数量是很少的。比如,单层时间轮只有 20 个 bucket,双层时间轮也才 40 个 bucket,三层时间轮也只有 60 个 bucket。
用 DelayQueue 来组织时间轮中所有的 bucket , 时间复杂度 O(logn)
就可以忽略不计了,因为 bucket 的数量太少了。
这样一来,当时间轮中没有任何到期的 bucket 时,时间轮将会一直阻塞在 DelayQueue 上,不会向前推进,这样就避免了无意义的空推进现象。当 DelayQueue 中有 bucket 到期时,时间轮被唤醒,将 currentTimeMs 推进到过期的时间点 expiration , 然后将到期 bucket 中的延时任务挨个拿下来处理。
笔者还是以之前的例子进行说明,假设我们现在有三层时间轮,初始状态下,这三层时间轮的 currentTimeMs 均指向 0ms 这个时刻,时间轮中有两个延时任务,分别在 28ms , 450ms 之后执行。
其中延时 28ms 之后执行的任务被存放在第二层时间轮的 bucket[1] 中,它的到期时间是 20ms 。延时 450ms 之后执行的任务被存放在第三层时间轮的 bucket[1] 中,它的到期时间是 400ms。
在 20ms 之内,时间轮就一直在 DelayQueue 上阻塞,不会向前空推进。当时间达到 20ms 这个时刻之后,第二层时间轮 bucket[1] 到期,时间轮从 DelayQueue 上唤醒。
时间轮的推进首先会从第一层时间轮开始,将第一层时间轮的 currentTimeMs 向前推进到 20ms 这个时刻,接着将第二层时间轮的 currentTimeMs 也向前推进到 20ms 这个时刻。
第三层时间轮的 currentTimeMs 保持不变,仍然停留在 0ms 这个时刻。因为第三层时间轮的时钟间隔 tickMs 是 400ms , 20ms 还不足以让第三层时间轮向前推进一格。
时间轮向前推进完毕之后,接下来 Kafka 就会处理到期的 bucket,就是第二层时间轮的 bucket[1] ,其中只有一个任务,它是延时 28ms 之后执行。但现在才过去 20ms ,还有 8ms 才能执行这个延时任务。所以 Kafka 会先将这个任务从第二层时间轮的 bucket[1] 中摘下,重新按照延时 8ms 执行的延时任务插入到第一层时间轮的 bucket[8] 中,然后将 bucket[8] 放入到 DelayQueue 中。
此时 DelayQueue 中有两个 bucket,一个是第一层时间轮的 bucket[8],8ms 之后到期,另一个是之前第三层时间轮的 bucket[1],380ms 之后到期(因为时间已经过了 20ms)。
还是同样的推进逻辑,当时间过了 8ms 之后,第一层时间轮的 bucket[8] 到期,时间轮从 DelayQueue 中唤醒,还是从第一层时间轮开始推进,不过此时经过上一轮的推进,第一层时间轮的 currentTimeMs 现在停留在 20 ms 这个时刻,所以首先需要将第一层的 currentTimeMs 向前推进到 28ms 。
第二层时间轮当前的 currentTimeMs 也是停留在 20 ms 这个时刻,而第二层时间轮的时钟间隔是 20ms , 但此时只是过了 8ms ,所以第二层的 currentTimeMs 不会向前推进,仍然是 20ms 。同理第三层时间轮的 currentTimeMs 也不会向前推进,仍然停留在 0ms 这个时刻。
当新一轮的推进工作完成之后,Kafka 就会着手处理过期的 bucket[8](第一层),此时 bucket[8] 中的延时任务也已经到期,所以 Kafka 会将 bucket[8] 中的所有延时任务摘下挨个执行。此时第一层时间轮 currentTimeMs 指向 28ms 这个时刻,一开始加入时间轮的这个延时 28ms 的任务刚好被执行。
当时间来到了 400ms 这个时刻,第三层时间轮的 bucket[1] 到期,Kafka 再一次从 DelayQueue 中唤醒,首先将第一层时间轮的 currentTimeMs 推进到 400ms 这个时刻,然后将第二层时间轮的 currentTimeMs 也推进到 400ms 这个时刻。
由于 400ms 刚好满足第三层时间轮的时钟间隔 tickMs(400ms),所以第三层时间轮的 currentTimeMs 也推进到 400ms 这个位置。但 bucket[1] 中的延时任务是要延时 450ms 之后执行,现在时间才过了 400ms ,还有 50ms 才能执行该任务。
所以需要将该任务近一步降级到低层时间轮中,延时任务从高层时间轮降级到低层时间轮的逻辑是:
首先尝试将延时任务降级到第一层时间轮中,但第一层时间轮的时钟周期是 20ms ,无法满足延时 50ms 的任务。接着尝试降级到第二层时间轮中。
第二层时间轮的时钟周期是 400ms , 可以满足延时 50ms 的任务,于是将该任务重新添加到第二层时间轮的 bucket[2] 中。然后将 bucket[2] 添加到 DelayQueue 中。
此时 DelayQueue 中就只包含一个 bucket 了,这个 bucket 就是第二层时间轮的 bucket[2],它在 40ms 之后过期。此时三层时间轮的 currentTimeMs 都是指向 400ms 这个时刻。
当时间在一次经过 40ms 之后,bucket[2] 到期,第一层时间轮的 currentTimeMs 从 400ms 推进到 440ms ,第二层时间轮的 currentTimeMs 也从 400ms 推进到 440ms,指向 bucket[2] 。由于 40ms 不满足第三层时间轮的时钟间隔(400ms),所以第三层的 currentTimeMs 仍然停留在 400ms 。
然而 bucket[2] 中的任务是要在 50ms 之后才能执行,但现在才过了 40ms ,还有 10ms 才能执行该任务。所以需要近一步降级,而第一层时间轮的时钟周期(20ms)刚好能满足这个延时时间(10ms), 所以该任务会被重新插入到第一层时间轮的 bucket[10] 中。随后 bucket[10] 被加入到 DelayQueue 中。
当时间在一次经过 10ms 之后,DelayQueue 中的 bucket[10](第一层)到期,第一层时间轮的 currentTimeMs 从 440ms 推进到 450ms 这个时刻,指向 bucket[10] 。由于 10ms 不满足第二层时间轮的时钟间隔(20ms),所以第二层的 currentTimeMs 仍然停留在 440ms。同理第三层的 currentTimeMs 仍然停留在 400ms 。
当时间轮向前推进的工作结束之后,Kafka 就开始处理过期的 bucket[10],其中需要延时 10ms 执行的任务也已经过期,所以 Kafka 会将 bucket[10] 中的所有延时任务挨个摘下执行。那么一开始加入时间轮的这个需要延时 450ms 之后执行的任务就刚好被执行了。
那么到现在为止,笔者在第 4 小节最后提出的那两个问题就完美的解决了,总结一下就是 Kafka 通过引入多层时间轮的设计解决了海量延时时间跨度比较大的定时任务场景。通过引入 DelayQueue 解决了时间轮空推进的问题。
6. Kafka 多层时间轮相关设计模型的实现
经过上一小节的介绍,现在我们已经对 Kafka 多层时间轮的设计要点以及众多模型概念非常熟悉了,那么在此基础之上,在回头来看源码实现就很清晰明了了,下面笔者先从 Kafka 的时间轮模型 SystemTimer ,TimingWheel 开始介绍。
6.1 时间轮的创建
SystemTimer 用于组织管理多层时间轮,核心属性如下:
public class SystemTimer implements Timer {
// 用于执行延时任务的 Executor
private final ExecutorService taskExecutor;
// 组织时间轮中所有的 TimerTaskList
private final DelayQueue<TimerTaskList> delayQueue;
// 时间轮中管理的延时任务个数
private final AtomicInteger taskCounter;
// 指向第一层时间轮,初始状态下只有一层时间轮,后续按需创建
private final TimingWheel timingWheel;
}
taskExecutor 用于执行多层时间轮中的延时任务,它是一个单线程的 FixedThreadPool。
delayQueue 用于组织管理多层时间轮中的所有 TimerTaskList,按照 TimerTaskList 的到期时间 expiration ,由近及远的排列。主要用于防止时间轮空推进的现象。
taskCounter 用于统计多层时间轮中总共管理的延时任务个数。
timingWheel 用于指向第一层时间轮,在初始状态下,只有一层时间轮,后续会根据延时任务的时间跨度按需创建多层时间轮。
public class SystemTimer implements Timer {
public SystemTimer(String executorName) {
this(executorName, 1, 20, Time.SYSTEM.hiResClockMs());
}
public SystemTimer(
String executorName,
long tickMs,
int wheelSize,
long startMs
) {
this.taskExecutor = Executors.newFixedThreadPool(1,
runnable -> KafkaThread.nonDaemon(SYSTEM_TIMER_THREAD_PREFIX + executorName, runnable));
this.delayQueue = new DelayQueue<>();
this.taskCounter = new AtomicInteger(0);
this.timingWheel = new TimingWheel(
tickMs, // 1ms
wheelSize, // 20
startMs, // 时间轮创建那一刻的时间戳
taskCounter,
delayQueue
);
}
}
TimingWheel 则是真正的时间轮模型,其核心结构如下图所示:
public class TimingWheel {
// 时间轮的时钟间隔,第一层时间轮的 tickMs 为 1ms
private final long tickMs;
// 时间轮的刻度,默认为 20
private final int wheelSize;
// 延时任务个数在多层时间轮中的全局计数
private final AtomicInteger taskCounter;
// 多层时间轮全局只有一个 DelayQueue 实例,组织管理所有 TimerTaskList
private final DelayQueue<TimerTaskList> queue;
// 时间轮的时钟周期,同时也是其上一层时间轮的时钟间隔 tickMs
// 第一层时间轮的 interval 为 20ms
private final long interval;
// 时间轮的表盘,环形数组
private final TimerTaskList[] buckets;
// 时间轮的指针,初始为创建时间轮时候的时间戳,它是一个绝对值,只增不减
private long currentTimeMs;
// 用于指向其上一层时间轮,按需创建
private volatile TimingWheel overflowWheel = null;
}
Kafka 时间轮中的指针是 currentTimeMs,它是一个绝对时间戳,初始为时间轮创建时候的时间戳 —— Time.SYSTEM.hiResClockMs()
, 单位为毫秒。由于时间轮是根据 tickMs 来一下一下的转动,所以 currentTimeMs 必须是 tickMs 的整数倍。
TimingWheel(
long tickMs,
int wheelSize,
long startMs,
AtomicInteger taskCounter,
DelayQueue<TimerTaskList> queue
) {
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.taskCounter = taskCounter;
this.queue = queue;
this.buckets = new TimerTaskList[wheelSize];
this.interval = tickMs * wheelSize;
// currentTimeMs 必须是 tickMs 的整数倍
this.currentTimeMs = startMs - (startMs % tickMs);
for (int i = 0; i < buckets.length; i++) {
buckets[i] = new TimerTaskList(taskCounter);
}
}
TimerTaskList 结构来描述时间轮中的刻度,它是一个带有头结点的双向循环链表。
class TimerTaskList implements Delayed {
// 时间工具类
private final Time time;
// 全局延时任务个数统计
private final AtomicInteger taskCounter;
// TimerTaskList 的过期时间
private final AtomicLong expiration;
// 双向循环链表的头结点
private final TimerTaskEntry root;
TimerTaskList(
AtomicInteger taskCounter,
Time time
) {
this.time = time;
this.taskCounter = taskCounter;
// 初始状态下,一个空的 TimerTaskList,它的 expiration 为 -1
// 表示未使用,不会加入到 DelayQueue
this.expiration = new AtomicLong(-1L);
this.root = new TimerTaskEntry(null, -1L);
this.root.next = root;
this.root.prev = root;
}
}
6.2 延时任务的添加
由于 Kafka 多层时间轮的设计,所以在延时任务添加这一块会比 Netty 的单层时间轮更加复杂一点,因为会涉及到延时任务的升级。另外当 DelayQueue 中的 TimerTaskList 到期的时候,如果 TimerTaskList 中的延时任务还没到期,也会涉及到延时任务的降级,那么在降级的过程中延时任务会被添加到低层时间轮中。
public class SystemTimer implements Timer {
public void add(TimerTask timerTask) {
readLock.lock();
try {
// 将延时任务 TimerTask 封装成 TimerTaskEntry 添加到时间轮中
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs()));
} finally {
readLock.unlock();
}
}
private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
// 尝试向时间轮中添加延时任务
// 返回 false 表示延时任务已经被取消或者到期
// 返回 true 表示延时任务添加成功
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled()) {
// 如果延时任务到期,则立马执行延时任务
taskExecutor.submit(timerTaskEntry.timerTask);
}
}
}
}
无论是在升级还是降级的过程中,如果发现延时任务已经到期,那么 Kafka 就会立即执行延时任务,延时任务的执行由单线程的 FixedThreadPool 负责。
无论是延时任务的添加还是延时任务从高层时间轮降级到低层时间轮,Kafka 首先都是从第一层时间轮开始尝试添加延时任务。
第一层时间轮的时钟间隔 tickMs 是 1ms , 一共 20 个 TimerTaskList,时钟周期为 20ms。每个 TimerTaskList 都有一个延时任务的到期时间范围 [ startTime , endTime ) 。
我们以 currentTimeMs = 0ms
这个时刻为例,TimerTaskList0 表示的延时任务到期时间范围是 [ 0ms , 1ms)
,过期时间在这个范围内的延时任务都会被添加到 TimerTaskList0 中。同理,TimerTaskList1 表示的时间范围为 [ 1ms , 2ms)
, TimerTaskList19 表示的时间范围为 [ 19ms , 20ms)
。
每个 TimerTaskList 也有一个自己的过期时间戳 expiration。TimerTaskList 在初始状态下,也就是没有任何延时任务的时候,它的 expiration 是 -1 。
class TimerTaskList implements Delayed {
// 过期时间戳
private final AtomicLong expiration;
}
当向 TimerTaskList 第一次添加延时任务的时候,expiration 将会被设置为 startTime 。随后会被加入到 DelayQueue 中。
当 currentTimeMs 到达某个 TimerTaskList 的 startTime 的时候,那么该 TimerTaskList 就过期了,Kafka 就可以处理该 TimerTaskList 中的延时任务。
比如,当第一层时间轮的 currentTimeMs 到达 0ms 这个时刻时,TimerTaskList0 过期,TimerTaskList0 中存放的是过期时间范围在 [ 0ms , 1ms)
之内的延时任务。例如,延时 0.5ms ,0.6ms , .... , 0.9ms 的任务将会在 currentTimeMs = 0ms
这个时刻执行。从这里我们可以看出 Kafka 时间轮的时钟精度就是 1ms 。
所以判断一个延时任务是否过期的条件就是看该任务的过期时间 expiration 是否小于当前时间轮的 currentTimeMs + tickMs
,如果小于,则表示该延时任务已经过期,需要被立即执行。
当延时任务的过期时间 expiration 小于当前时间轮的时钟周期 —— currentTimeMs + interval
的时候,就表示当前时间轮可以满足该延时任务的时间跨度,所以该延时任务就会被添加到当前时间轮中,这里的逻辑就和 Netty 的单层时间轮一样了。
// Kafka
long virtualId = expiration / tickMs;
int bucketId = (int) (virtualId % (long) wheelSize);
TimerTaskList bucket = buckets[bucketId];
bucket.add(timerTaskEntry);
// Netty
long calculated = timeout.deadline / tickDuration;
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
比如第一层时间轮中,在 1.5ms 这个时刻到期的延时任务就会被添加到 TimerTaskList1 中,TimerTaskList1 的时间范围为 [ 1ms , 2ms),TimerTaskList1 的过期时间为 virtualId * tickMs = 1ms
。
在 19.6ms 这个时刻到期的延时任务就会被添加到 TimerTaskList19 中,TimerTaskList19 的时间范围为 [ 19ms , 20ms),TimerTaskList19 的过期时间为 virtualId * tickMs = 19ms
。
当延时任务被添加到对应的 TimerTaskList 之后,我们就需要设置 TimerTaskList 的过期时间 expiration。
class TimerTaskList implements Delayed {
public boolean setExpiration(long expirationMs) {
return expiration.getAndSet(expirationMs) != expirationMs;
}
}
TimerTaskList 的过期时间 expiration 初始为 -1 , 当第一次向 TimerTaskList 添加延时任务的时候 setExpiration
方法返回 true ,后续再向该 TimerTaskList 添加延时任务时,TimerTaskList 的 expiration 就保持不变了, setExpiration
方法返回 false 。
当 setExpiration
方法返回 true 的时候,表示我们第一次向 TimerTaskList 添加延时任务,那么这个 TimerTaskList 就会被加入到 DelayQueue 中。
如果延时任务的过期时间 expiration 大于等于当前时间轮的时钟周期 —— currentTimeMs + interval
的时候,就表示当前时间轮无法满足该延时任务的时间跨度。这时候 Kafka 就会按需创建上一层时间轮,随后尝试向上一层时间轮添加延时任务。
private synchronized void addOverflowWheel() {
if (overflowWheel == null) {
overflowWheel = new TimingWheel(
interval, // 高层时间轮的 tickMs 恰好是其低一层时间轮的 interval
wheelSize,
currentTimeMs,
taskCounter,
queue
);
}
}
这里我们可以看到,上一层时间轮的时钟间隔 tickMs 恰好是其下一层时间轮的时钟周期 interval。
下面是向 Kafka 多层时间轮添加延时任务的完整逻辑:
public class TimingWheel {
public boolean add(TimerTaskEntry timerTaskEntry) {
// 获取延时任务到期时间戳
long expiration = timerTaskEntry.expirationMs;
if (timerTaskEntry.cancelled()) {
// 延时任务被取消则返回 false
return false;
} else if (expiration < currentTimeMs + tickMs) {
// 表示延时任务已经到期
return false;
} else if (expiration < currentTimeMs + interval) {
// 当前时间轮的时钟周期可以满足延时任务的时间跨度
// 那么就将延时任务添加到当前时间轮中
// 计算延时任务应该被添加到哪个 TimerTaskList 中
long virtualId = expiration / tickMs;
int bucketId = (int) (virtualId % (long) wheelSize);
TimerTaskList bucket = buckets[bucketId];
bucket.add(timerTaskEntry);
// 设置 TimerTaskList 的过期时间
if (bucket.setExpiration(virtualId * tickMs)) {
// 将 TimerTaskList 添加到 DelayQueue 中
queue.offer(bucket);
}
// 延时任务添加成功返回 true
return true;
} else {
// 当前时间轮的时钟周期无法满足延时任务的时间跨度
// 那么就将延时任务升级到上一层时间轮中
if (overflowWheel == null) addOverflowWheel(); // 按需添加多层时间轮
return overflowWheel.add(timerTaskEntry);
}
}
}
6.3 多层时间轮的运转
Netty 是通过一个 workerThread 每隔 tickDuration(100ms)将时钟 tick 向前推进一格。Kafka 中也有一个工作线程 —— Reaper 来推动多层时间轮的运转。
private static final long WORK_TIMEOUT_MS = 200L;
class Reaper extends ShutdownableThread {
@Override
public void doWork() {
try {
timer.advanceClock(WORK_TIMEOUT_MS);
} catch (InterruptedException ex) {
// Ignore.
}
}
}
而 Kafka 为了解决时间轮空推进的问题,只有 TimerTaskList 到期的时候 Reaper 线程才会向前推进多层时间轮,如果没有 TimerTaskList 到期,Kafka 是不会向前推进的,仿佛时间静止了一样。
延时任务会按照过期时间的不同被组织在不同的 TimerTaskList 中, 每个 TimerTaskList 都有一个过期时间范围 —— [ startTime , endTime) , 只要过期时间在这个范围内的延时任务,那么它就会被添加到该 TimerTaskList 中。TimerTaskList 自身的过期时间被设置为 startTime。关于这一点,笔者已经在前面的内容中反复强调过了。
当没有任何 TimerTaskList 到期的情况下,Reaper 线程就会一直在 delayQueue 上阻塞等待,直到有到期的 TimerTaskList 出现,Reaper 线程从 delayQueue 上被唤醒,开始处理 TimerTaskList 中的延时任务,并向前推进多层时间轮。
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
当某一个 TimerTaskList 到期之后,说明现在时间已经来到了该 TimerTaskList 对应的 startTime , 那么 Kafka 就会先从第一层时间轮开始,尝试将多层时间轮的 currentTimeMs 推进到 startTime。
但在推进之前,首先需要判断 startTime 与当前 currentTimeMs 的时间间隔是否满足当前时间轮的 tickMs(时钟间隔),如果不满足,时间轮就不能向前推进,因为还不够一个时钟间隔。
比如上图中展示的延时任务。当时间达到 20ms 这个时刻之后,第二层时间轮 bucket[1] 到期,时间轮从 DelayQueue 上唤醒。时间轮的推进首先会从第一层时间轮开始,因为 20ms 已经达到了第一层时间轮的时钟间隔(1ms), 所以 Kafka 会将第一层时间轮的 currentTimeMs 向前推进到 20ms 这个时刻。
接着开始尝试推进第二层时间轮,由于 20ms 仍然满足第二层时间轮的时钟间隔(20ms),所以也将第二层时间轮的 currentTimeMs 向前推进到 20ms 这个时刻。
后面紧跟着就会尝试推进第三层时间轮。但此时 20ms 已经无法满足第三层时间轮的时钟间隔(400ms)了,所以第三层时间轮的 currentTimeMs 仍然停留在 0ms 这个时刻,不会向前推进。
public class TimingWheel {
// 从第一层时间轮开始,尝试将各层时间轮的 currentTimeMs 推进到过期 TimerTaskList 的 startTime(timeMs)
public void advanceClock(long timeMs) {
// 时间跨度是否满足当前时间轮的时钟间隔
if (timeMs >= currentTimeMs + tickMs) {
// 将时间轮的时间推进到过期 TimerTaskList 的 startTime 位置
// 必须确保 currentTimeMs 是时钟间隔 tickMs 的整数倍
currentTimeMs = timeMs - (timeMs % tickMs);
// 尝试推进更高层时间轮
if (overflowWheel != null) overflowWheel.advanceClock(currentTimeMs);
}
}
}
当多层时间轮的推进工作结束之后,Reaper 线程就会着手处理到期 TimerTaskList 中的延时任务,那么 Kafka 这里该如何处理这些延时任务呢 ?
由于是多层时间轮的设计,所以 TimerTaskList 到期,并不代表其中的延时任务到期。比如,上图第二层时间轮中的 TimerTaskList1,它在 20ms 这个时刻到期,但是其中的延时任务却是在 28ms 这个时刻到期。
所以就要将 TimerTaskList 中所有的延时任务挨个进行降级,降级的过程上一小节笔者已经详细介绍过了,Kafka 首先会尝试将延时任务降级到第一层时间轮,如果第一层时间轮的时钟周期无法满足延时跨度,那么在尝试向第二层时间轮降级,这样循环往复,直到可以找到某一层时间轮的时钟周期能够满足该延时跨度。
比如,第一层时间轮的 currentTimeMs 现在已经推进到了 20ms 这个时刻,指向 TimerTaskList0 , 而延时 28ms 的任务降级到第一层时间轮之后,currentTimeMs 只需要等待 8ms 之后就可以转动到 28ms 的位置,而第一层时间轮的时钟周期是 20ms ,所以可以满足这个 8ms 的延时跨度,于是该延时任务就会被降级到第一层时间轮的 TimerTaskList8 中。
同理,当时间经过 400ms 之后,第三层时间轮的 TimerTaskList1 到期,但是其中的延时任务却是在 450ms 这个时刻才能到期,于是首先尝试向第一层时间轮降级。
第一层时间轮的 currentTimeMs 现在已经推进到了 400ms 这个时刻,仍然指向 TimerTaskList0,如果延时 450ms 的任务添加到第一层时间轮之后,currentTimeMs 需要等待 50ms 之后才可以转动到 450ms 的位置,但是第一层时间轮的时钟周期是 20ms ,无法满足这个 50ms 的延时跨度。
于是该延时任务就会近一步尝试向第二层时间轮降级,第二层时间轮的 currentTimeMs 也已经推进到了 400ms 这个时刻,等待 50ms 之后就可以转动到 450ms 的位置,而第二层时间轮的时钟周期是 400ms ,可以满足这个 50ms 的延时跨度,于是该延时任务最终会被降级到第二层时间轮的 TimerTaskList2中。
下面就是 Kafka 针对以上过程的源码实现,过程梳理清晰了,源码就变得简单明了了:
public class SystemTimer implements Timer {
public boolean advanceClock(long timeoutMs) throws InterruptedException {
// 等待到期的 TimerTaskList
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (bucket != null) {
writeLock.lock();
try {
while (bucket != null) {
// 尝试将多层时间轮的 currentTimeMs 向前推进到 TimerTaskList 的过期时间
timingWheel.advanceClock(bucket.getExpiration());
// 将过期 TimerTaskList 中的延时任务挨个降级到低层时间轮中
bucket.flush(this::addTimerTaskEntry);
bucket = delayQueue.poll();
}
} finally {
writeLock.unlock();
}
return true;
} else {
// 没有到期的任务
return false;
}
}
}
总结
本文我们主要讨论了定时任务调度相关的主题,笔者一开始介绍了 JDK 的三种调度组件:Timer,DelayQueue,ScheduledThreadPoolExecutor。但他们的共同特点都是采用了小根堆这种数据结构来组织管理定时任务,然而在面对海量定时任务的添加与取消时,这种设计的时间复杂度会比较高 —— O(logn)
。
随后笔者介绍了 Netty 的单层时间轮 HashedWheelTimer,它将海量定时任务的添加与取消操作的时间复杂度降低到了 O(1)
, 但无法避免时间轮的空推进现象以及无法应对海量延时跨度比较大的定时任务场景。
最后笔者详细讨论了 Kafka 时间轮的设计与实现,Kafka 通过引入 DelayQueue 以及多层时间轮,巧妙地解决了时间轮的空推进现象和海量延时任务时间跨度大的管理问题。好了,今天的内容到这里就全部结束了,我们下篇文章见~~~