背景
在调度系统中存在很多定时执行的任务,这些任务有不同的执行周期,比如有每分钟、每小时、每天执行一次的,也有可能是半天执行一次的,且这些任务之间需要建立依赖关系,组成一个数据处理流。
开发一个调度系统,首先需要解决这些不同周期任务相互依赖的问题,保证任务能够根据配置的定时和依赖关系正确触发执行。
任务周期
如下我们可以举出这些周期类型
public enum JobCycle {
MINUTE(1), HOUR(2), DAY(3), WEEK(4), MONTH(5), YEAR(6), NONE(7);
private int code;
JobCycle(int code) {
this.code = code;
}
public static JobCycle of(int code) {
for (JobCycle jobCycle : JobCycle.values()) {
if (jobCycle.code == code) {
return jobCycle;
}
}
throw new IllegalArgumentException("unsupported job cycle " + code);
}
}
一个任务是哪种周期类型,可以由用户设定的 cron
表达式判断计算出来,而不是让用户指定周期类型,这样容易造成周期类型与表达式不一致。
CronUtils
假如使用 Quartz 的语法 "1 0 3 * * ?"
表示每天 03:00:01
执行,如果要判断周期类型,需要将这个语义转换成数字以便于计算,所以首先想到的是找一个工具来帮助我们计算出表达式所要表示的频率。
使用 quartz
提供的 TriggerUtils.computeFireTimes
方法可以间接计算出 interval
public class CronUtils {
public static long intervalOf(String cron) {
return intervalOf(getCronTrigger(cron));
}
public static long intervalOf(CronTriggerImpl cronTrigger) {
List<Date> dates = computeFireTimes(cronTrigger, null, 2);
Date next = dates.get(0);
Date nextNext = dates.get(1);
return nextNext.getTime() - next.getTime();
}
private static CronTriggerImpl getCronTrigger(String cron) {
final CronTriggerImpl cronTrigger = new CronTriggerImpl();
try {
cronTrigger.setCronExpression(cron);
} catch (ParseException e) {
throw new RuntimeException("Cron expression is invalid");
}
return cronTrigger;
}
}
由此,我们可以进一步判断出一个任务的 JobCycle
public enum JobCycle {
....
private static final long minute = 60_000;
private static final long hour = 60 * minute;
private static final long day = 24 * hour;
private static final long week = 7 * day;
private static final long month = 28 * day;
private static final long year = 365 * day;
public static JobCycle from(Long interval) {
if (interval >= minute && interval < hour) return MINUTE;
if (interval >= hour && interval < day) return HOUR;
if (interval >= day && interval < week) return DAY;
if (interval >= week && interval < month) return WEEK;
if (interval >= month) return MONTH;
return NONE;
}
...
}
同周期依赖
在大数据 ETL 任务中,绝大多数属于天级的任务,即今天对昨天一整天的数据集成然后计算,这里隐含了两种相关联的时间
- 数据时间,如 hive 中的天分区
dt ='2019-11-12'
- 计算时间,即调度系统的调度时间
schedule_time
,或者quartz
的firetime
数据时间 = 调度时间 - 调度频率
如果 P 任务每天02:01:04 执行,C 任务每天 03:00:01 执行
String parentCron = "4 1 2 * * ?"; //P
String childCron = "1 0 3 * * ?"; //C
C 依赖于 P, P <- C,当 P 有如下执行历史时
TreeSet<TaskSuccessRecord> history = new TreeSet<>();
history.add(of(parentCron, parse("2019-11-09 02:01:04")));
history.add(of(parentCron, parse("2019-11-10 02:01:04")));
当 C 在 2019-11-10 03:00:01
触发时,如何根据 P 的成功的历史记录判断依赖是否满足?C 可以运行的前提是 P 的数据已经准备好,对于天级的离线表来说表示 dt = 2019-11-09
分区生成,C 可以基于这个分区的数据生成 C 的 dt = 2019-11-09
分区,2019-11-09
即数据时间,但是判断数据时间,比如检查文件目录有没有生成,或者检查数据量比较复杂。
所以通常的做法是检查调度时间,即在 2019-11-10 03:00:01
时,如果 P 的历史中存在 2019-11-10
这一天运行成功的记录,那么就认为 2019-11-09
的数据已经就绪,C 的依赖条件满足。如上 CronUtils
有方法可以计算出两个任务的周期都是天,所以我们知道是在同一天父任务运行成功就行了,但是如何确切判断知道 2019-11-10 02:01:04
这个记录呢?这个牵涉到如何根据一个任意时间计算一个任务的当前、下一个以及前一个调度时刻。
我们需要在 CronUtils 类中实现如下方法
/**
* Compute schedule time by a given point.
*
* pre of next
* -2 -1 sometime 0
* |__________________|________|____________|
* |____interval______|
*
*/
public class CronUtils {
public static LocalDateTime previousScheduleTimeOf(String cron, LocalDateTime sometime) {
return scheduleTime(cron, sometime, -2);
}
public static LocalDateTime scheduleTimeOf(String cron, LocalDateTime sometime) {
return scheduleTime(cron, sometime, -1);
}
public static LocalDateTime nextScheduleTimeOf(String cron, LocalDateTime sometime) {
return scheduleTime(cron, sometime, 0);
}
private static LocalDateTime scheduleTime(String cron, LocalDateTime sometime, int offset) {
CronTriggerImpl cronTrigger = getCronTrigger(cron);
long interval = intervalOf(cronTrigger);
Date from = from(sometime.atZone(systemDefault()).toInstant());
Date to = new Date(from.getTime() + interval);
List<Date> dates = computeFireTimesBetween(cronTrigger, null, from, to);
Date next = dates.get(0);
return ofEpochMilli(next.getTime() + interval * offset).atZone(systemDefault()).toLocalDateTime();
}
...
}
有了这些方法之后,我们可以计算出 2019-11-10 00:00:00
之后的第一个调度时间在 history
中存在就可以了
LocalDateTime parentScheduleTime = nextScheduleTimeOf(parentCron, parse(`2019-11-10 00:00:00`))
assertThat(history.has(of(parentCron, parentScheduleTime))).isTrue();
这里需要说一下 history 如果是一个 TreeSet 是没有 has 方法,可以使用 ceiling
来查找如下
@Test
public void tree_set_correct_search_method() {
TreeSet<Integer> set = new TreeSet<>();
set.add(1);
set.add(2);
set.add(4);
assertThat(set.ceiling(2)).isEqualTo(2); // ceiling includes equals
assertThat(set.higher(2)).isEqualTo(4);
}
有些任务可能刚好设置的是 0 点开始调度的,所以使用 ceiling
而不是 higher
。
之所以使用 Set
的原因是,当一个任务运行多次,比如除系统调度运行外,用户可能会手动执行,Set
可以去重只保留一条记录,从而能简化依赖判断,使用 TreeSet
而不是其它 Set
的原因是数据可以模拟实际的调度的情景,按照调度时间有序,且方便查找,因此成功记录需要实现 Comparator
接口。
@Data
public class TaskSuccessRecord implements Comparable<TaskSuccessRecord> {
private final LocalDateTime scheduleTime;
private final String cronExpression;
public static TaskSuccessRecord of(String cronExpression, LocalDateTime scheduleTime) {
requireNonNull(cronExpression, "Cron is null");
requireNonNull(scheduleTime, "Schedule time is null");
return new TaskSuccessRecord(cronExpression, scheduleTime);
}
public TaskSuccessRecord(String cronExpression, LocalDateTime scheduleTime) {
this.cronExpression = cronExpression;
this.scheduleTime = scheduleTime;
}
public long interval() {
return intervalOf(cronExpression);
}
@Override
public int compareTo(TaskSuccessRecord lastRecord) {
return scheduleTime.truncatedTo(SECONDS)
.compareTo(lastRecord.getScheduleTime()
.truncatedTo(SECONDS));
}
public boolean cronEquals(String cronExpression) {
return this.cronExpression.equals(cronExpression);
}
}
这样我们应该可以从 history
中找到成功记录,不过知道 2019-11-10 00:00:00
这个起始时间其实是需要计算出来的,即根据 C 的调度时间 2019-11-10 03:00:01
计算出来,先计算出 C 的周期,然后取 C 周期的起始时间,我们得增加如下方法
public enum JobCycle {
...
public static ChronoUnit truncateUnit(Long interval) {
switch (from(interval)) {
case MINUTE:
return MINUTES;
case HOUR:
return HOURS;
case DAY:
return DAYS;
case WEEK:
return WEEKS;
case MONTH:
return MONTHS;
case YEAR:
return YEARS;
}
return null;
}
}
...
然后
long interval = intervalOf(childCron);
ChronoUnit truncateUnit = truncateUnit(interval); //DAYS
parse(`2019-11-10 03:00:01` ).truncateTo(truncateUnit); //2019-11-10 00:00:00
大周期依赖小周期
依据二八法则,80% 任务可能都是天级的任务,但是 20% 的任务可能都各种各样,属于不周的周期,比如小时,周等,而且要互相依赖。
我们先继续考查大周期依赖小周期,比如 C 是天级,P 是小时级
String parentCron = "4 1 */1 * * ?"; //P 每小时 01:04 执行
String childCron = "3 1 3 * * ?"; //C 每天 03:01:03 执行
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 22:01:04"));
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-09 23:01:04"));
TaskSuccessRecord p3 = of(parentCron, parse("2019-11-10 00:01:04"));
TaskSuccessRecord p4 = of(parentCron, parse("2019-11-10 01:01:04"));
TaskSuccessRecord p5 = of(parentCron, parse("2019-11-10 02:01:04"));
TaskSuccessRecord p6 = of(parentCron, parse("2019-11-10 03:01:04"));
history.add(p1);
history.add(p2);
history.add(p3);
history.add(p4);
history.add(p5);
history.add(p6);
C 是一个天级的表,一个分区代表一整天的数据,而 P 需要 24 个小时分区代表一整天的数据,C 只需要 P 前一天的 23 小时的数据就绪即可,P 2019-11-10 03:01:03
计算的是 23 的数据,有了前一小节同周期依赖的经验,我们可以很容易知道只需要判断 p3 是否生成就好了。
因此,我们得到一个生成检查点的规率,需要用大周期生成
private LocalDateTime checkPointBase(String scheduleTimeStr, String theGreaterCycleCron) {
long interval = intervalOf(theGreaterCycleCron);
return parse(scheduleTimeStr).truncatedTo(truncateUnit(interval));
}
最后,检查的方法是这样的
TaskSuccessRecord checkPoint = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:03", childCron)));
assertThat(history.ceiling(checkPoint)).isEqualTo(p3);
小周期依赖大周期
对于以上的方法我们可以继续对其它情况进行检查,比如小时依赖天任务
@Test
public void child_hour_parent_day() {
String parentCron = "3 1 3 * * ?"; //P 每天 03:01:03 执行
String childCron = "2 1 */1 * * ?"; //C 每小时 01:02 执行
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 03:01:03"));
history.add(p1);
TaskSuccessRecord check_point_1 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:02", parentCron)));
assertThat(history.ceiling(check_point_1)).isNull();
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 03:01:03"));
history.add(p2);
TaskSuccessRecord check_point_2 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 04:01:02", parentCron)));
assertThat(history.ceiling(check_point_2)).isEqualTo(p2);
}
以上也是可以成功检查到的,但是实际情况更能会更复杂,比如父任务是 1,13 小时各运行一次,即半天运行一次,但是子任务是每小运行一次,这个时候需要根据半天这个大周期来偏移,LocalDateTime 没有直接的方法来 truncate 半天,因此我们需要修改一下 checkPointBase
方法
private LocalDateTime checkPointBase(String scheduleTimeStr, String theGreaterCycleCron) {
long interval = intervalOf(theGreaterCycleCron);
ChronoUnit truncateUnit = truncateUnit(interval);
Integer cycles = numberOfCycles(interval);
return parse(scheduleTimeStr).truncatedTo(truncateUnit).minus(cycles - 1, truncateUnit);
}
以及添加计算周期数的方法
public static Integer numberOfCycles(Long interval) {
return round(interval / from(interval).cycleInterval());
}
然后我们模拟上面的例子
@Test
public void child_hour_parent_hour_1_and_13() {
String parentCron = "4 1 1,13 * * ?";
String childCron = "3 1 */1 * ?";
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 13:01:04"));
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 01:01:04"));
history.add(p1);
TaskSuccessRecord check_point_1 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-09 13:01:03", parentCron)));
assertThat(history.ceiling(check_point_1)).isEqualTo(p1);
TaskSuccessRecord check_point_2 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-09 14:01:03", parentCron)));
assertThat(history.ceiling(check_point_2)).isEqualTo(p1);
}
更多情况测试
@Test
public void child_day_parent_hour_1_and_13() {
String parentCron = "4 1 1,13 * * ?";
String childCron = "3 1 3 * * ?";
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 13:01:04"));
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 01:01:04"));
history.add(p1);
TaskSuccessRecord checkPoint = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:03", childCron)));
assertThat(history.ceiling(checkPoint)).isNull();
history.add(p2);
assertThat(history.ceiling(checkPoint)).isEqualTo(p2);
}
@Test
public void child_hour_parent_minute() {
String parentCron = "3 */5 * * * ?";
String childCron = "4 5 */1 * * ?";
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-10 00:50:03"));
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 00:55:03"));
TaskSuccessRecord p3 = of(parentCron, parse("2019-11-10 01:05:03"));
history.add(p1);
history.add(p2);
TaskSuccessRecord checkPoint = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 01:05:04", childCron)));
assertThat(history.ceiling(checkPoint)).isNull();
history.add(p3);
assertThat(history.ceiling(checkPoint)).isEqualTo(p3);
}
@Test
public void child_hour_parent_day() {
String parentCron = "3 1 3 * * ?"; //P 每天 03:01:03 执行
String childCron = "2 1 */1 * * ?"; //C 每小时 01:02 执行
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 03:01:03"));
history.add(p1);
TaskSuccessRecord check_point_1 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:02", parentCron)));
assertThat(history.ceiling(check_point_1)).isNull();
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 03:01:03"));
history.add(p2);
TaskSuccessRecord check_point_2 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 04:01:02", parentCron)));
assertThat(history.ceiling(check_point_2)).isEqualTo(p2);
}
其它问题
以上我们找到了一种能够覆盖多种场景的通用的检查依赖的方法,可以简化代码的复杂度,但仍然还有其它情况需要考虑,比如 cron 表达式中途变更之后,周期发生改变的情况,比如天依赖小时,24 小时中有失败的情况等。
其中 cron 表达式变化,如果变化之后的周期比之前小,历史记录是有效的,反之则需要重新开始依赖,可以试着推理看看。对于小时任务失败的情况,一种解决办法是自依赖,任务自己依赖自己的上一个周期,这种情况如果 23 时成功,表示全部成功,自依赖也是调度系统需要支持的特性,它的依赖方式跟同周期依赖相似,但稍有差别。但是自依赖任务有失败重跑比较耗时从而容易导致数据过度延迟的风险,因此还是需要依赖判断支持更细粒度的检查。
不过总得来说,支持更多情况只需要在前面的方法的基础上扩展,是很容易实现的,不会对既有的结构产生大的变化,或者可以期待博主有进一步的更新,前面的例子中的代码,请访问我的 github 项目 https://github.com/artiship/cyclic,有问题欢迎留言交流。
写在最后
找寻如上的方法得益于我对单元测试的使用,人的记忆据说只有 5 个槽, 比如我们短时记忆很容易记住 5 个数字,超出 5 个就略显困难了,在做复杂的推理时,过多的条件在脑中很难模拟,借助测试可以理清思路,其实测试很像是在做研究的过程,先提出一个假设,再寻找解决办法,再举出很多情况来验证这个办法是否通行,如此反复,这关乎科学。