任务调度入门二——分布式任务调度平台Elastic-Job-Lite

1.Elastic-Job简介

如果要在分布式集群环境下去安全的执行一个调度任务,常见的做法就是保证在集群环境下,只有集群中的一台机器能够获取执行任务的权限。而Elastic-Job在实现分布式Job时是将集群中所有的机器都利用起来,通过多进程多线程执行作业任务。也就是说如果本机的数据分片分到了多个分片(即一个JVM进程分到了多个分片),Elastic-Job会为每一个分片去启动一个线程来执行分片任务(在AbstractElasticJobExecutor的process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource))。

Elastic-Job相当于quartz+zk的加强版,它允许对定时任务分片,可以集群部署(每个job的"分片"会分散到各个节点上),如果某个节点挂了,该节点上的分片,会调度到其它节点上。

Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

2.Elastic-Job-Lite分布调度特点

Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

3.Elastic-Job-Lite整体架构

整体架构

elastic-job-lite是以zookeeper作为注册中心的,console作为控制台和服务端解耦,通过restful api操作zk改变job的配置信息。
服务端启动时会进行:连接zk→注册job→初始化Scheduler→进行leader选举→由选举出的主作业节点进行分片,然后所有节点按照job配置信息调度作业。

4.分片场景

(1)分片总数为1,并使用多于1台的服务器执行作业

作业将会以1主n从的方式执行。一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

在同一个zookepper和jobname情况下,多台机器部署了Elastic job时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行。

(2)分片项总数大于服务器的数量

作业将会合理的利用分布式资源,动态的分配分片项。

例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。

5.Elastic-Job-Lite注册中心数据结构

注册中心数据结构.png

注册中心在定义的命名空间下,创建作业名称节点,用于区分不同作业,所以作业一旦创建则不能修改作业名称,如果修改名称将视为新的作业。作业名称节点下又包含5个数据子节点,分别是config, instances, sharding, servers和leader。

(1)config节点

config节点保存作业配置信息,以JSON格式存储


config节点.png
(2)instances节点

作业运行实例信息,子节点是当前作业运行实例的主键。作业运行实例主键由作业运行服务器的IP地址和PID构成。作业运行实例主键均为临时节点,当作业实例上线时注册,下线时自动清理。注册中心监控这些节点的变化来协调分布式作业的分片以及高可用。 可在作业运行实例节点写入TRIGGER表示该实例立即执行一次。


instances节点.png
(3)sharding节点

作业分片信息,子节点是分片项序号,从零开始,至分片总数减一


sharding节点.png
(4)servers节点

作业服务器信息,子节点是作业服务器的IP地址


servers节点.png
(5)leader节点

作业服务器主节点信息。
分为election,sharding和failover三个子节点,分别用于主节点选举,分片和失效转移处理。
失效转移:在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。


leader节点.png

6.作业启动流程

job_start.jpg

Elastic-Job-Lite初始化的入口是JobSchedule,应用服务器启动时,会调用JobSchedule的init方法,开启作业启动流程。首先添加或更新作业配置信息,并将配置信息持久化到zk上;接着创建quartz调度器,作业的调度执行依赖quartz技术;然后启动所有的监听器,包括leader选举监听、失效转移监听、分片监听等,并发起主节点选举,将leader节点信息set到leader/election/instance节点下;然后将服务器信息、实例信息注册到zk上,并且在leader/sharding下创建necessary节点,作为重新分片的标记;最后由quartz调度器根据cron表达式调度执行。

7.作业执行流程

job_exec.jpg

Elastic-Job-Lite执行器的入口是实现了Job接口的LiteJob类,当任务调度执行时,进入LiteJob类的execute方法。在这里完成一系列的操作,包括获取失效转移分片项,如果没有分配的失效转移项,则判断是否需要重新分片,然后获取分配给自己的分片项,然后判断当前分片项是否正在running,如果否,则执行任务项;如果是,则在sharding/[item]下添加misfire节点,标示该分片项错过执行,等待分片项执行结束后,再触发misfire的分片项执行。

8.示例

elastic-job提供了三种类型的作业:Simple类型作业、Dataflow类型作业、Script类型作业。
可以使用JAVA、Spring启动,下面例子使用JAVA启动

(1)Simple类型作业
①添加Maven依赖
        <!-- 引入elastic-job-lite核心模块 -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
        </dependency>
②创建Job
public class MySimpleJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        int key = shardingContext.getShardingItem();
        switch (key) {
            case 0:
                System.out.println("---------------->0");
                break;
            case 1:
                System.out.println("---------------->1");
                break;
            default:
                System.out.println("---------------->default");
                break;
        }
    }
}
③配置启动调度器
public class StartJob {

   public static void main(String[] args) {
       new JobScheduler(createRegistryCenter(), createSimpleJobConfiguration()).init();//创建作业调度器并初始化作业
   }

   private static CoordinatorRegistryCenter createRegistryCenter() {
       CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job-demo"));//创建指定命名空间为elastic-job-demo
       regCenter.init();//初始化注册中心.
       return regCenter;
   }

   /**
    * 创建Job配置
    * @return
    */
   private static LiteJobConfiguration createSimpleJobConfiguration() {
       JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
       // 定义SIMPLE类型配置
       SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName());
       // 定义Lite作业根配置
       LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
       return simpleJobRootConfig;
   }
}
(2)Dataflow类型作业
①创建Job
public class MyDataFlowJob implements DataflowJob<Foo> {

    /**
     * 获取待处理数据.
     *
     * @param context
     * @return
     */
    @Override
    public List<Foo> fetchData(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0:
                List<Foo> data1 = new ArrayList<>();// get data from database by sharding item 0
                data1.add(new Foo("sharding item 0"));
                return data1;
            case 1:
                List<Foo> data2 = new ArrayList<>();// get data from database by sharding item 1
                data2.add(new Foo("sharding item 1"));
                return data2;
            case 2:
                List<Foo> data3 = new ArrayList<>();// get data from database by sharding item 2
                data3.add(new Foo("sharding item 2"));
                return data3;
            // case n: ...
        }
        return null;
    }

    /**
     * 处理数据
     *
     * @param shardingContext
     * @param data
     */
    @Override
    public void processData(ShardingContext shardingContext, List<Foo> data) {
        // process data
        for (Foo foo : data) {
            System.out.println(foo);
        }
    }
}
②创建Foo
public class Foo {
    private String name;

    public Foo(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "Foo{" +
                "name='" + name + '\'' +
                '}';
    }
}
③将配置修改成如下
    /**
     * 创建DataflowJob配置
     * @return
     */
    private static LiteJobConfiguration createDataflowJobConfiguration() {
        // 定义作业核心配置
        JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("DataflowJob", "0/30 * * * * ?", 10).build();
        // 定义DATAFLOW类型配置
        DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), true);
        // 定义Lite作业根配置
        JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
        return (LiteJobConfiguration) dataflowJobRootConfig;
    }

ps:记得先启动zookeeper

参考
[1]Elastic-Job原理分析(version:2.1.4) - 快鸟 - 博客园
[2]elastic-job-lite入门以及架构原理分析 - 云+社区 - 腾讯云
[3]Elastic-Job/Elastic-Job-Lite简介
[4]分布式定时任务调度平台Elastic-Job技术详解_慕课手记
[5]Elastic-Job-Lite实现原理

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352