调度系统中不同周期任务依赖的方法(1)

背景

在调度系统中存在很多定时执行的任务,这些任务有不同的执行周期,比如有每分钟、每小时、每天执行一次的,也有可能是半天执行一次的,且这些任务之间需要建立依赖关系,组成一个数据处理流。

开发一个调度系统,首先需要解决这些不同周期任务相互依赖的问题,保证任务能够根据配置的定时和依赖关系正确触发执行。

任务周期

如下我们可以举出这些周期类型

public enum JobCycle {
    MINUTE(1), HOUR(2), DAY(3), WEEK(4), MONTH(5), YEAR(6), NONE(7);
    private int code;

    JobCycle(int code) {
        this.code = code;
    }

    public static JobCycle of(int code) {
        for (JobCycle jobCycle : JobCycle.values()) {
            if (jobCycle.code == code) {
                return jobCycle;
            }
        }
        throw new IllegalArgumentException("unsupported job cycle " + code);
    }
}

一个任务是哪种周期类型,可以由用户设定的 cron 表达式判断计算出来,而不是让用户指定周期类型,这样容易造成周期类型与表达式不一致。

CronUtils

假如使用 Quartz 的语法 "1 0 3 * * ?" 表示每天 03:00:01 执行,如果要判断周期类型,需要将这个语义转换成数字以便于计算,所以首先想到的是找一个工具来帮助我们计算出表达式所要表示的频率。

使用 quartz 提供的 TriggerUtils.computeFireTimes 方法可以间接计算出 interval

public class CronUtils {
    public static long intervalOf(String cron) {
        return intervalOf(getCronTrigger(cron));
    }

    public static long intervalOf(CronTriggerImpl cronTrigger) {
        List<Date> dates = computeFireTimes(cronTrigger, null, 2);
        Date next = dates.get(0);
        Date nextNext = dates.get(1);

        return nextNext.getTime() - next.getTime();
    }

    private static CronTriggerImpl getCronTrigger(String cron) {
        final CronTriggerImpl cronTrigger = new CronTriggerImpl();
        try {
            cronTrigger.setCronExpression(cron);
        } catch (ParseException e) {
            throw new RuntimeException("Cron expression is invalid");
        }
        return cronTrigger;
    }
}

由此,我们可以进一步判断出一个任务的 JobCycle

public enum JobCycle {
    ....
    private static final long minute = 60_000;
    private static final long hour = 60 * minute;
    private static final long day = 24 * hour;
    private static final long week = 7 * day;
    private static final long month = 28 * day;
    private static final long year = 365 * day;

    public static JobCycle from(Long interval) {
        if (interval >= minute && interval < hour) return MINUTE;
        if (interval >= hour && interval < day) return HOUR;
        if (interval >= day && interval < week) return DAY;
        if (interval >= week && interval < month) return WEEK;
        if (interval >= month) return MONTH;

        return NONE;
    }
    ...
}

同周期依赖

在大数据 ETL 任务中,绝大多数属于天级的任务,即今天对昨天一整天的数据集成然后计算,这里隐含了两种相关联的时间

  • 数据时间,如 hive 中的天分区 dt ='2019-11-12'
  • 计算时间,即调度系统的调度时间 schedule_time,或者 quartzfiretime

数据时间 = 调度时间 - 调度频率

如果 P 任务每天02:01:04 执行,C 任务每天 03:00:01 执行

String parentCron = "4 1 2 * * ?"; //P
String childCron  = "1 0 3 * * ?"; //C

C 依赖于 P, P <- C,当 P 有如下执行历史时

TreeSet<TaskSuccessRecord> history = new TreeSet<>();

history.add(of(parentCron, parse("2019-11-09 02:01:04")));
history.add(of(parentCron, parse("2019-11-10 02:01:04")));

当 C 在 2019-11-10 03:00:01 触发时,如何根据 P 的成功的历史记录判断依赖是否满足?C 可以运行的前提是 P 的数据已经准备好,对于天级的离线表来说表示 dt = 2019-11-09 分区生成,C 可以基于这个分区的数据生成 C 的 dt = 2019-11-09 分区,2019-11-09 即数据时间,但是判断数据时间,比如检查文件目录有没有生成,或者检查数据量比较复杂。

所以通常的做法是检查调度时间,即在 2019-11-10 03:00:01 时,如果 P 的历史中存在 2019-11-10 这一天运行成功的记录,那么就认为 2019-11-09 的数据已经就绪,C 的依赖条件满足。如上 CronUtils 有方法可以计算出两个任务的周期都是天,所以我们知道是在同一天父任务运行成功就行了,但是如何确切判断知道 2019-11-10 02:01:04 这个记录呢?这个牵涉到如何根据一个任意时间计算一个任务的当前、下一个以及前一个调度时刻。

我们需要在 CronUtils 类中实现如下方法

/**
 * Compute schedule time by a given point.
 *
 * pre                of                  next
 * -2                 -1    sometime        0
 * |__________________|________|____________|
 * |____interval______|
 *
 */
public class CronUtils {

    public static LocalDateTime previousScheduleTimeOf(String cron, LocalDateTime sometime) {
        return scheduleTime(cron, sometime, -2);
    }

    public static LocalDateTime scheduleTimeOf(String cron, LocalDateTime sometime) {
        return scheduleTime(cron, sometime, -1);
    }

    public static LocalDateTime nextScheduleTimeOf(String cron, LocalDateTime sometime) {
        return scheduleTime(cron, sometime, 0);
    }

    private static LocalDateTime scheduleTime(String cron, LocalDateTime sometime, int offset) {
        CronTriggerImpl cronTrigger = getCronTrigger(cron);

        long interval = intervalOf(cronTrigger);

        Date from = from(sometime.atZone(systemDefault()).toInstant());
        Date to = new Date(from.getTime() + interval);

        List<Date> dates = computeFireTimesBetween(cronTrigger, null, from, to);
        Date next = dates.get(0);

        return ofEpochMilli(next.getTime() + interval * offset).atZone(systemDefault()).toLocalDateTime();
    }
...
}

有了这些方法之后,我们可以计算出 2019-11-10 00:00:00 之后的第一个调度时间在 history 中存在就可以了

LocalDateTime parentScheduleTime = nextScheduleTimeOf(parentCron, parse(`2019-11-10 00:00:00`))

assertThat(history.has(of(parentCron, parentScheduleTime))).isTrue();

这里需要说一下 history 如果是一个 TreeSet 是没有 has 方法,可以使用 ceiling 来查找如下

@Test
public void tree_set_correct_search_method() {
    TreeSet<Integer> set = new TreeSet<>();
    set.add(1);
    set.add(2);
    set.add(4);

    assertThat(set.ceiling(2)).isEqualTo(2); // ceiling includes equals
    assertThat(set.higher(2)).isEqualTo(4);
}

有些任务可能刚好设置的是 0 点开始调度的,所以使用 ceiling 而不是 higher

之所以使用 Set 的原因是,当一个任务运行多次,比如除系统调度运行外,用户可能会手动执行,Set 可以去重只保留一条记录,从而能简化依赖判断,使用 TreeSet 而不是其它 Set 的原因是数据可以模拟实际的调度的情景,按照调度时间有序,且方便查找,因此成功记录需要实现 Comparator 接口。

@Data
public class TaskSuccessRecord implements Comparable<TaskSuccessRecord> {

    private final LocalDateTime scheduleTime;
    private final String cronExpression;

    public static TaskSuccessRecord of(String cronExpression, LocalDateTime scheduleTime) {
        requireNonNull(cronExpression, "Cron is null");
        requireNonNull(scheduleTime, "Schedule time is null");

        return new TaskSuccessRecord(cronExpression, scheduleTime);
    }

    public TaskSuccessRecord(String cronExpression, LocalDateTime scheduleTime) {
        this.cronExpression = cronExpression;
        this.scheduleTime = scheduleTime;
    }

    public long interval() {
        return intervalOf(cronExpression);
    }

    @Override
    public int compareTo(TaskSuccessRecord lastRecord) {
        return scheduleTime.truncatedTo(SECONDS)
                           .compareTo(lastRecord.getScheduleTime()
                                                .truncatedTo(SECONDS));
    }

    public boolean cronEquals(String cronExpression) {
        return this.cronExpression.equals(cronExpression);
    }
}

这样我们应该可以从 history 中找到成功记录,不过知道 2019-11-10 00:00:00 这个起始时间其实是需要计算出来的,即根据 C 的调度时间 2019-11-10 03:00:01 计算出来,先计算出 C 的周期,然后取 C 周期的起始时间,我们得增加如下方法

public enum JobCycle {
...
  public static ChronoUnit truncateUnit(Long interval) {
        switch (from(interval)) {
            case MINUTE:
                return MINUTES;
            case HOUR:
                return HOURS;
            case DAY:
                return DAYS;
            case WEEK:
                return WEEKS;
            case MONTH:
                return MONTHS;
            case YEAR:
                return YEARS;
        }

        return null;
    }
}
...

然后

long interval = intervalOf(childCron);
ChronoUnit truncateUnit = truncateUnit(interval); //DAYS
parse(`2019-11-10 03:00:01` ).truncateTo(truncateUnit); //2019-11-10 00:00:00

大周期依赖小周期

依据二八法则,80% 任务可能都是天级的任务,但是 20% 的任务可能都各种各样,属于不周的周期,比如小时,周等,而且要互相依赖。

我们先继续考查大周期依赖小周期,比如 C 是天级,P 是小时级

String parentCron = "4 1 */1 * * ?"; //P 每小时 01:04 执行
String childCron  = "3 1 3 * * ?"; //C 每天 03:01:03 执行

TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 22:01:04"));
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-09 23:01:04"));
TaskSuccessRecord p3 = of(parentCron, parse("2019-11-10 00:01:04"));
TaskSuccessRecord p4 = of(parentCron, parse("2019-11-10 01:01:04"));
TaskSuccessRecord p5 = of(parentCron, parse("2019-11-10 02:01:04"));
TaskSuccessRecord p6 = of(parentCron, parse("2019-11-10 03:01:04"));

history.add(p1);
history.add(p2);
history.add(p3);
history.add(p4);
history.add(p5);
history.add(p6);

C 是一个天级的表,一个分区代表一整天的数据,而 P 需要 24 个小时分区代表一整天的数据,C 只需要 P 前一天的 23 小时的数据就绪即可,P 2019-11-10 03:01:03 计算的是 23 的数据,有了前一小节同周期依赖的经验,我们可以很容易知道只需要判断 p3 是否生成就好了。

因此,我们得到一个生成检查点的规率,需要用大周期生成

private LocalDateTime checkPointBase(String scheduleTimeStr, String theGreaterCycleCron) {
    long interval = intervalOf(theGreaterCycleCron);
    return parse(scheduleTimeStr).truncatedTo(truncateUnit(interval));
}

最后,检查的方法是这样的

TaskSuccessRecord checkPoint = of(childCron,
                nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:03", childCron)));

assertThat(history.ceiling(checkPoint)).isEqualTo(p3);

小周期依赖大周期

对于以上的方法我们可以继续对其它情况进行检查,比如小时依赖天任务

@Test
public void child_hour_parent_day() {
    String parentCron = "3 1 3 * * ?"; //P 每天 03:01:03 执行
    String childCron = "2 1 */1 * * ?"; //C 每小时 01:02 执行

    TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 03:01:03"));
    history.add(p1);

    TaskSuccessRecord check_point_1 = of(childCron,
            nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:02", parentCron)));

    assertThat(history.ceiling(check_point_1)).isNull();

    TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 03:01:03"));
    history.add(p2);

    TaskSuccessRecord check_point_2 = of(childCron,
            nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 04:01:02", parentCron)));

    assertThat(history.ceiling(check_point_2)).isEqualTo(p2);
}

以上也是可以成功检查到的,但是实际情况更能会更复杂,比如父任务是 1,13 小时各运行一次,即半天运行一次,但是子任务是每小运行一次,这个时候需要根据半天这个大周期来偏移,LocalDateTime 没有直接的方法来 truncate 半天,因此我们需要修改一下 checkPointBase 方法

private LocalDateTime checkPointBase(String scheduleTimeStr, String theGreaterCycleCron) {
    long interval = intervalOf(theGreaterCycleCron);

    ChronoUnit truncateUnit = truncateUnit(interval);
    Integer cycles = numberOfCycles(interval);

    return parse(scheduleTimeStr).truncatedTo(truncateUnit).minus(cycles - 1, truncateUnit);
}

以及添加计算周期数的方法

public static Integer numberOfCycles(Long interval) {
     return round(interval / from(interval).cycleInterval());
}

然后我们模拟上面的例子

@Test
public void child_hour_parent_hour_1_and_13() {
    String parentCron = "4 1 1,13 * * ?";
    String childCron  = "3 1 */1 * ?";

    TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 13:01:04"));
    TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 01:01:04"));

    history.add(p1);

    TaskSuccessRecord check_point_1 = of(childCron,
            nextScheduleTimeOf(parentCron, checkPointBase("2019-11-09 13:01:03", parentCron)));

    assertThat(history.ceiling(check_point_1)).isEqualTo(p1);

    TaskSuccessRecord check_point_2 = of(childCron,
            nextScheduleTimeOf(parentCron, checkPointBase("2019-11-09 14:01:03", parentCron)));

    assertThat(history.ceiling(check_point_2)).isEqualTo(p1);
}

更多情况测试

@Test
public void child_day_parent_hour_1_and_13() {
    String parentCron = "4 1 1,13 * * ?";
    String childCron  = "3 1 3 * * ?";

    TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 13:01:04"));
    TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 01:01:04"));

    history.add(p1);

    TaskSuccessRecord checkPoint = of(childCron,
            nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:03", childCron)));

    assertThat(history.ceiling(checkPoint)).isNull();

    history.add(p2);

    assertThat(history.ceiling(checkPoint)).isEqualTo(p2);
}

@Test
public void child_hour_parent_minute() {
    String parentCron  = "3 */5 * * * ?";
    String childCron = "4 5 */1 * * ?";

    TaskSuccessRecord p1 = of(parentCron, parse("2019-11-10 00:50:03"));
    TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 00:55:03"));
    TaskSuccessRecord p3 = of(parentCron, parse("2019-11-10 01:05:03"));

    history.add(p1);
    history.add(p2);

    TaskSuccessRecord checkPoint = of(childCron,
            nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 01:05:04", childCron)));

    assertThat(history.ceiling(checkPoint)).isNull();

    history.add(p3);

    assertThat(history.ceiling(checkPoint)).isEqualTo(p3);
}

@Test
public void child_hour_parent_day() {
    String parentCron = "3 1 3 * * ?"; //P 每天 03:01:03 执行
    String childCron = "2 1 */1 * * ?"; //C 每小时 01:02 执行

    TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 03:01:03"));
    history.add(p1);

    TaskSuccessRecord check_point_1 = of(childCron,
            nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:02", parentCron)));

    assertThat(history.ceiling(check_point_1)).isNull();

    TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 03:01:03"));
    history.add(p2);

    TaskSuccessRecord check_point_2 = of(childCron,
            nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 04:01:02", parentCron)));

    assertThat(history.ceiling(check_point_2)).isEqualTo(p2);
}

其它问题

以上我们找到了一种能够覆盖多种场景的通用的检查依赖的方法,可以简化代码的复杂度,但仍然还有其它情况需要考虑,比如 cron 表达式中途变更之后,周期发生改变的情况,比如天依赖小时,24 小时中有失败的情况等。

其中 cron 表达式变化,如果变化之后的周期比之前小,历史记录是有效的,反之则需要重新开始依赖,可以试着推理看看。对于小时任务失败的情况,一种解决办法是自依赖,任务自己依赖自己的上一个周期,这种情况如果 23 时成功,表示全部成功,自依赖也是调度系统需要支持的特性,它的依赖方式跟同周期依赖相似,但稍有差别。但是自依赖任务有失败重跑比较耗时从而容易导致数据过度延迟的风险,因此还是需要依赖判断支持更细粒度的检查。

不过总得来说,支持更多情况只需要在前面的方法的基础上扩展,是很容易实现的,不会对既有的结构产生大的变化,或者可以期待博主有进一步的更新,前面的例子中的代码,请访问我的 github 项目 https://github.com/artiship/cyclic,有问题欢迎留言交流。

写在最后

找寻如上的方法得益于我对单元测试的使用,人的记忆据说只有 5 个槽, 比如我们短时记忆很容易记住 5 个数字,超出 5 个就略显困难了,在做复杂的推理时,过多的条件在脑中很难模拟,借助测试可以理清思路,其实测试很像是在做研究的过程,先提出一个假设,再寻找解决办法,再举出很多情况来验证这个办法是否通行,如此反复,这关乎科学。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,941评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,397评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,345评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,851评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,868评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,688评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,414评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,319评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,775评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,945评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,096评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,789评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,437评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,993评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,107评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,308评论 3 372
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,037评论 2 355