思维导图
系列总目录
前言
- 延时消息指的在分布式异步消息场景下,生产端发送一条消息,希望在指定延时或者指定时间点被消费端消费到,而不是立刻被消费
使用场景
- 订单30分钟内未支付取消
- 任务截止日志前几天未完成,提醒
时间轮原理
- 高效延时消息,包含两个重要的数据结构:
- 环形队列,例如可以创建一个包含3600个slot的环形队列(可以用数组实现)
-
任务集合,环上每一个slot是一个Set<Task>, Task结构中有两个很重要的属性:
2.1 Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
2.2 Task-Function:需要执行的任务函数
- 同时,启动一个timer:
- 此timer每隔一定时间(比如1秒),在环形队列中移动一格
- 用一个Current Index来标识正在检测的slot
- 如上图,假设当前Current Index指向第一格,当有延时消息到达之后,例如希望3610秒之后,触发一个延时消息任务,只需:
- 计算这个Task应该放在哪一个slot,现在指向1,3610秒之后,应该是第11格,所以这个Task应该放在第11个slot的Set<Task>中;
- 计算这个Task的Cycle-Num,由于环形队列是3600格(每秒移动一格,正好1小时),这个任务是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1;
- Current Index不停的移动,每秒移动一格,当移动到一个新slot,遍历这个slot中对应的Set<Task>,每个Task看Cycle-Num是不是0:
- 如果不是0,说明还需要多移动几圈,将Cycle-Num减1;
- 如果是0,说明马上要执行这个Task了,取出Task-Funciton执行,丢给工作线程执行,并把这个Task从Set<Task>中删除;
实现
定时扫描数据库
- 定时扫描mysql数据库,扫描快到期的任务,然后进行处理。
- 优劣势: 实现简单,但不适合数据量大场景
RocketMq
- 天然支持延迟消息,只支持 18 个 Level 的延时,并不支持任意时间。对于普通业务也是够用
-
原理:延时 Level 的消息会被暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据 level 存入特定的queue,queueId = delayTimeLevel – 1(即一个queue只存相同延时的消息,保证具有相同发送延时的消息能够顺序消费), broker会调度地费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。原理图:
Qmq
- Qmq是携程开源的消息队列,开源版本跟内部版本有一定的差。Qmq提供任意时间的延时/定时消息,你可以指定消息在未来两年内(可配置)任意时间内投递。目前为止最适合的延迟任务设计方案
- 设计原理: 多级时间轮 + 延时加载 + 延时消息单独磁盘存储
- 两层hash轮
- 第一层位于磁盘上,每个小时为一个刻度(可以配置),每个刻度会生成一个日志文件,因为QMQ支持两年内的延时消息(默认支持两年内,可以进行配置修改),则最多会生成 2 * 366 * 24 = 17568 个文件。
- 第二层在内存中,当消息的投递时间即将到来的时候,会将这个刻度的消息索引(索引包括消息在schedule log中的offset和size)从磁盘文件加载到内存中的hash wheel上,内存中的hash wheel则是以500ms为一个刻度。
- 优点
- 通过多级时间轮设计,支持了超大时间跨度的延时消息;
- 通过延时加载,内存中只会有最近要消费的消息,更久的延时消息会被存储在磁盘中,对内存友好;
- 延时消息单独存储,不会影响到正常消息的空间回收;