elastic job

1. 介绍

1.1 简介

1.2 测试代码

public static void main(String[] args) {
        // zokeeper注册中心
        CoordinatorRegistryCenter registerCenter = createRegistryCenter();
        // lite job 配置
        LiteJobConfiguration liteJobCfg = createJobConfiguration();
        new JobScheduler(registerCenter, liteJobCfg).init();
    }

    private static CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
                // zookeeper地址和任务节点
                new ZookeeperConfiguration("localhost:2181", "elastic-job-demo"));
        // 初始化zookeeper注册中心
        regCenter.init();
        return regCenter;
    }

    private static LiteJobConfiguration createJobConfiguration() {
        // 任务作业配置
        JobCoreConfiguration simpleCoreConfig = 
                JobCoreConfiguration.newBuilder("demoSimpleJob", "*/5 * * * * ?", 3).build();
        // 任务
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,
                DemoJob.class.getCanonicalName());
        // 定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
        return simpleJobRootConfig;
    }

1.3 流程简介

1.3.1 初始化启动
启动quartz.png
1.3.2 执行
执行.png
1.3.3 zk节点
节点大体分配.png
  • 蓝色代表主节点

2. 初始化

2.1 初始化zk

public final class ZookeeperConfiguration {
    
    /**
     * 连接Zookeeper服务器的列表.
     * 包括IP地址和端口号.
     * 多个地址用逗号分隔.
     * 如: host1:2181,host2:2181
     */
    private final String serverLists;
    
    /**
     * 命名空间.
     */
    private final String namespace;
    
    /**
     * 等待重试的间隔时间的初始值.
     * 单位毫秒.
     */
    private int baseSleepTimeMilliseconds = 1000;
    
    /**
     * 等待重试的间隔时间的最大值.
     * 单位毫秒.
     */
    private int maxSleepTimeMilliseconds = 3000;
    
    /**
     * 最大重试次数.
     */
    private int maxRetries = 3;
    
    /**
     * 会话超时时间.
     * 单位毫秒.
     */
    private int sessionTimeoutMilliseconds;
    
    /**
     * 连接超时时间.
     * 单位毫秒.
     */
    private int connectionTimeoutMilliseconds;
    
    /**
     * 连接Zookeeper的权限令牌.
     * 缺省为不需要权限验证.
     */
    private String digest;
}
public void init() {
        log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(zkConfig.getServerLists())
                .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
                .namespace(zkConfig.getNamespace());
        if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
            builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
        }
        if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
            builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
        }
        if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
            builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
                    .aclProvider(new ACLProvider() {
                    
                        @Override
                        public List<ACL> getDefaultAcl() {
                            return ZooDefs.Ids.CREATOR_ALL_ACL;
                        }
                    
                        @Override
                        public List<ACL> getAclForPath(final String path) {
                            return ZooDefs.Ids.CREATOR_ALL_ACL;
                        }
                    });
        }
        client = builder.build();
        client.start();
        try {
            if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                client.close();
                throw new KeeperException.OperationTimeoutException();
            }
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }

2.2 初始化配置

2.2.1 JobCoreConfiguration
    private final String jobName;
    
    private final String cron;
    
    private final int shardingTotalCount;
    
    /**
     * 设置分片序列号和个性化参数对照表.
     *
     * <p>
     * 分片序列号和参数用等号分隔, 多个键值对用逗号分隔. 类似map.
     * 分片序列号从0开始, 不可大于或等于作业分片总数.
     * 如:
     * 0=a,1=b,2=c
     * </p>
     *
     * @param shardingItemParameters 分片序列号和个性化参数对照表
     *
     * @return 作业配置构建器
     */
    private final String shardingItemParameters;
    
    private final String jobParameter;
    
    // 是否开启失效转移
    private final boolean failover;
    // 是否执行错过作业
    private final boolean misfire;
    
    private final String description;
    
    private final JobProperties jobProperties;
2.2.2 JobConfiguration
JobConfiguration.png

SimpleJobConfiguration

    // 通用作业配置
    private final JobCoreConfiguration coreConfig;
    
    // 作业类型
    private final JobType jobType = JobType.SIMPLE;
    
    // 作业类
    private final String jobClass;

2.2.3 LiteJobConfiguration

    // 作业配置
    private final JobTypeConfiguration typeConfig;
    
    // 是否监控作业状态
    private final boolean monitorExecution;
    
    // 允许本机与注册中心的时间误差秒数
    private final int maxTimeDiffSeconds;
    
    // 监控通信端口
    private final int monitorPort;
    
    // 分片策略类
    private final String jobShardingStrategyClass;
    
    // 设置修复作业服务器不一致状态服务执行间隔分钟数
    private final int reconcileIntervalMinutes;
    
    // 设置作业是否启动时禁止
    private final boolean disabled;
    
    // 本地配置是否覆盖注册中心配置
    private final boolean overwrite;

3. 启动

3.1 通用service

// 作业配置服务, 就是LiteJobConfiguration
new ConfigurationService(regCenter, jobName);
// 主节点服务
new LeaderService(regCenter, jobName);
// 可用作业服务器服务
new ServerService(regCenter, jobName);
// 运行作业节点服务
new InstanceService(regCenter, jobName);
// 分片服务
new ShardingService(regCenter, jobName);
// 作业运行服务
new ExecutionService(regCenter, jobName);
 // 作业监控服务
new MonitorService(regCenter, jobName);
// 调解分布式作业不一致状态服务
new ReconcileService(regCenter, jobName);
// 作业失效转移服务
new FailoverService(regCenter, jobName)

3.2 JobScheduler

   /**
     * 作业调度
     * 
     * @param regCenter
     * @param liteJobConfig
     * @param jobEventBus
     * @param elasticJobListeners
     */
    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        // 全局运行信息
        // 作业名称  = 作业实例主键(IP + @-@ + xxxxx(JVM-PID)
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        this.liteJobConfig = liteJobConfig;
        this.regCenter = regCenter;
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
        // 向AbstractDistributeOnceElasticJobListener类型listener, 添加注册中心
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        // 调度门面类
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        // 作业服务门面类
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }
3.2.1 SchedulerFacade
public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
        this.jobName = jobName;
        // 作业配置服务, 就是LiteJobConfiguration
        configService = new ConfigurationService(regCenter, jobName);
        // 主节点服务
        leaderService = new LeaderService(regCenter, jobName);
        // 可用作业服务器服务
        serverService = new ServerService(regCenter, jobName);
        // 运行作业节点服务
        instanceService = new InstanceService(regCenter, jobName);
        // 分片服务
        shardingService = new ShardingService(regCenter, jobName);
        // 作业运行服务
        executionService = new ExecutionService(regCenter, jobName);
        // 作业监控服务
        monitorService = new MonitorService(regCenter, jobName);
        // 调解分布式作业不一致状态服务
        reconcileService = new ReconcileService(regCenter, jobName);
        // 节点监听
        // 监听的${namespace}/${job-name}节点
        listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
    }
  • ListenerManager 监听${namespace}/${job-name}节点, 重要!!!
3.2.2 LiteJobFacade
  public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final JobEventBus jobEventBus) {
        configService = new ConfigurationService(regCenter, jobName);
        shardingService = new ShardingService(regCenter, jobName);
        executionContextService = new ExecutionContextService(regCenter, jobName);
        executionService = new ExecutionService(regCenter, jobName);
        failoverService = new FailoverService(regCenter, jobName);
        this.elasticJobListeners = elasticJobListeners;
        this.jobEventBus = jobEventBus;
    }

3.3 init

   public void init() {
        // 更新并获得 ${job-name}/config 的节点配置数据(LiteJobConfiguration)
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        // 全局运行信息
        // job-name : 分片数
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        // 创建JobScheduleController 封装了quartz的信息
        JobScheduleController jobScheduleController = new JobScheduleController(
                 // 创建org.quartz.Scheduler信息
                 createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        // 全局运行信息
        // job-name : JobScheduleController : registerCenter
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        // 启动前操作
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        // 启动本地quartz
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }
  • schedulerFacade.updateJobConfiguration(liteJobConfig); 添加或者更新${namespace}/${job-name}/config节点信息
3.3.1 createScheduler
 private Scheduler createScheduler() {
        Scheduler result;
        try {
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties());
            result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }
    
    private Properties getBaseQuartzProperties() {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        result.put("org.quartz.threadPool.threadCount", "1");
        result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
        result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
        return result;
    }
3.3.2 createJobDetail
  • 统一LiteJob执行
  private JobDetail createJobDetail(final String jobClass) {
        // 设置执行任务
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }
3.3.3 registerStartUpInfo !!!
  public void registerStartUpInfo(final boolean enabled) {
        // 启动zk节点监听器
        listenerManager.startAllListeners();
        // 选主: ${namespace}/${job-name}/leader/election
        // 监听器发现节点变化: ${namespace}/${job-name}/leader/sharding/necessary
        leaderService.electLeader();
        // 添加作业服务器节点: ${namespace}/${job-name}/server节点
        serverService.persistOnline(enabled);
        // 运行实例: ${namespace}/${job-name}/instances
        instanceService.persistOnline();
        // 重新设置需要分片标记: ${namespace}/${job-name}/leader/sharding/necessary
        shardingService.setReshardingFlag();
        // 作业监控服务
        monitorService.listen();
        // 是否开启动态监听是否重新分片服务
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }
3.3.3.1 leaderService.electLeader()

主节点选举成功, 回调LeaderElectionExecutionCallback

    /**
     * 选举主节点.
     */
    public void electLeader() {
        log.debug("Elect a new leader now.");
        jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    }

   /**
     * 在主节点执行操作.
     * 
     * @param latchNode 分布式锁使用的作业节点名称
     * @param callback 执行操作的回调
     */
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }

    @RequiredArgsConstructor
    class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            // 判断是否有${namespace}/${job-name}/leader/election/instance节点
            if (!hasLeader()) {
                jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            }
        }
    }

    /**
     * 判断是否已经有主节点.
     * 
     * @return 是否已经有主节点
     */
    public boolean hasLeader() {
        return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
    }
  • 选主相当于一个分布式的锁,保证一个节点成为主节点
3.3.3.2 reconcileService.startAsync()

动态监听是否重新分片服务

public final class ReconcileService extends AbstractScheduledService {
    
    private long lastReconcileTime;
    
    private final ConfigurationService configService;
    
    private final ShardingService shardingService;
    
    private final LeaderService leaderService;
    
    public ReconcileService(final CoordinatorRegistryCenter regCenter, final String jobName) {
        lastReconcileTime = System.currentTimeMillis();
        configService = new ConfigurationService(regCenter, jobName);
        shardingService = new ShardingService(regCenter, jobName);
        leaderService = new LeaderService(regCenter, jobName);
    }
    
    @Override
    protected void runOneIteration() throws Exception {
        LiteJobConfiguration config = configService.load(true);
        int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
        // 检测间隔时间
        if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
            lastReconcileTime = System.currentTimeMillis();
            //
            if (leaderService.isLeaderUntilBlock() //主节点操作
                    && !shardingService.isNeedSharding() // 当前不需要分片
                    && shardingService.hasShardingInfoInOfflineServers()) { // 可作业实例和"分片节点作业实例"不一致
                log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
                shardingService.setReshardingFlag();
            }
        }
    }
    
   /**
    *  一分钟执行一次
    */
    @Override
    protected Scheduler scheduler() {
        return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
    }
}

3.4 执行LiteJob

quartz定时执行

Executor.png
public final class LiteJob implements Job {
    
    @Setter
    private ElasticJob elasticJob;
    
    @Setter
    private JobFacade jobFacade;
    
    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}
public final void execute() {
        try {
            // 检查本机与注册中心的时间误差秒数是否在允许范围
            // 误差范围外还继续执行
            jobFacade.checkJobExecutionEnvironment();
        } catch (final JobExecutionEnvironmentException cause) {
            jobExceptionHandler.handleException(jobName, cause);
        }
        // 获得分片
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();
        // JobEvent
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
        }
        // 如果当前分片项仍在运行
        // 设置任务被错过执行的标记: ${namespace}/${job-name}/sharding/${item}/misfire
        // 并停止执行
        if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                        "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                        shardingContexts.getShardingItemParameters().keySet()));
            }
            return;
        }
        // before ElasticJobListener
        try {
            jobFacade.beforeJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobExceptionHandler.handleException(jobName, cause);
        }
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
        // 不需要重新分片
        // 存在错过执行任务
        // 并且config可以执行错过任务
        // while循环, 防止错误再次启动而未执行的任务
        while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
            // 清除错过任务标记
            jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
            // 重新执行
            execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
        }
       // 开启失败转移
        // 当前任务执行结束
        // ${namespace}/${job-name}/leader/FAILOVER/latch下选取主节点
        // 主节点-锁, 获得${namespace}/${job-name}/leader/FAILOVER/${items} 节点下一个失败转移分片
        // FailoverLeaderExecutionCallback设置${namespace}/${job-name}/sharding/${item}/FAILOVER = 当前作业实例
        // 删除${namespace}/${job-name}/leader/FAILOVER/${items}
        // 重新调度任务
        jobFacade.failoverIfNecessary();
        try {
            // after ElasticJobListener
            jobFacade.afterJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobExceptionHandler.handleException(jobName, cause);
        }
    }
3.4.1 分片
   @Override
    public ShardingContexts getShardingContexts() {
        boolean isFailover = configService.load(true).isFailover();
        // 开启失败转移, 获得失败转移的分片
        if (isFailover) {
            // 获得${namespace}/${job-name}/sharding/${item}/FAILOVER = 当前运行实例, 的分片
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
        // 分片
        shardingService.shardingIfNecessary();
        // 获得${namespace}/${job-name}/sharding/${item}/instance = 当前运行实例, 的分片
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        if (isFailover) {
            // 移除失效的分片, 交给获得失败转移的作业实例执行
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        // 移除被禁用的分片
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }
   public void shardingIfNecessary() {
        // 获得可作业的运行实例: ${namespace}/${job-name}/instances下节点
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
        // 不需要分片 || 没有运行实例
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        // 不是主节点
        // 等待主节点, 分片结束
        // 就是不存在 ${namespace}/${job-name}/leader/sharding/necessary 和 processing节点
        if (!leaderService.isLeaderUntilBlock()) {
            blockUntilShardingCompleted();
            return;
        }
        /**--------------------**/
        /**     主节点分片      **/
        /**--------------------**/

        // 等待正在运行的分片实例结束
        // 判断running节点
        waitingOtherShardingItemCompleted();
        LiteJobConfiguration liteJobConfig = configService.load(false);
        int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        // 正在分片节点
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
        // 创建或者刷新分片节点${namespace}/${job-name}/sharding/${item}
        resetShardingInfo(shardingTotalCount);
        // 获得分片策略
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
        // JobShardingStrategy分片
        // PersistShardingInfoTransactionExecutionCallback
        // 创建对应的创建或者刷新分片节点${namespace}/${job-name}/sharding/${item}/instance = 当前运行实例
        // 移除ShardingNode.NECESSARY 和 ShardingNode.PROCESSING
        // 释放掉等待分片的从节点
        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }
3.4.2 运行前和后zk节点处理
  • 设置 ${namespace}/${job-name}/sharding/${item}/running
  • 执行结束清除
    删除${namespace}/${job-name}/sharding/${item}/running
    删除${namespace}/${job-name}/sharding/${item}/failover
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        // 空分片不处理
        if (shardingContexts.getShardingItemParameters().isEmpty()) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
            }
            return;
        }
        // 作业运行标识节点
        // ${namespace}/${job-name}/sharding/${item}/running
        jobFacade.registerJobBegin(shardingContexts);
        String taskId = shardingContexts.getTaskId();
        // 事件记录
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
        }
        try {
            process(shardingContexts, executionSource);
        } finally {
            // TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
            // 作业结束
            // 删除${namespace}/${job-name}/sharding/${item}/running
            // 删除${namespace}/${job-name}/sharding/${item}/failover
            jobFacade.registerJobCompleted(shardingContexts);
            if (itemErrorMessages.isEmpty()) {
                // event 结束处理
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
                }
            } else {
                if (shardingContexts.isAllowSendJobEvent()) {
                    // event 处理错误信息
                    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
                }
            }
        }
    }
3.4.3 执行job
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
        // 一个分片直接执行了
        if (1 == items.size()) {
            int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
            JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
            process(shardingContexts, item, jobExecutionEvent);
            return;
        }
        // 闭锁, 等待所有分片任务执行结束, 返回
        final CountDownLatch latch = new CountDownLatch(items.size());
        for (final int each : items) {
            final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
            if (executorService.isShutdown()) {
                return;
            }
            executorService.submit(new Runnable() {
                
                @Override
                public void run() {
                    try {
                        process(shardingContexts, each, jobExecutionEvent);
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobExecutionEvent(startEvent);
        }
        log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
        JobExecutionEvent completeEvent;
        try {
            // 执行job
            process(new ShardingContext(shardingContexts, item));
            completeEvent = startEvent.executionSuccess();
            log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobExecutionEvent(completeEvent);
            }
            // CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            // 异常处理
            // CHECKSTYLE:ON
            completeEvent = startEvent.executionFailure(cause);
            jobFacade.postJobExecutionEvent(completeEvent);
            itemErrorMessages.put(item, ExceptionUtil.transform(cause));
            jobExceptionHandler.handleException(jobName, cause);
        }
    }

4. 其他

4.1 监听器

  • ListenerManager管理
  • 基类
public abstract class AbstractJobListener implements TreeCacheListener {
    
    @Override
    public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
        ChildData childData = event.getData();
        if (null == childData) {
            return;
        }
        String path = childData.getPath();
        if (path.isEmpty()) {
            return;
        }
        dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
    }
    
    protected abstract void dataChanged(final String path, final Type eventType, final String data);
}
  • 主节点相关: ElectionListenerManager.start()
     /**
     * 重新选主
     */
    class LeaderElectionJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            // 没有关闭
            // 不是主节点 && 当前作业节点存活
            // ${namespace}/${job-name}/leander/election/instace 移除了 && 当前作业节点可用
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
                // 重新选举主
                leaderService.electLeader();
            }
        }
        
        private boolean isActiveElection(final String path, final String data) {
            return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
        }
        
        private boolean isPassiveElection(final String path, final Type eventType) {
            return isLeaderCrashed(path, eventType) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp());
        }
        
        private boolean isLeaderCrashed(final String path, final Type eventType) {
            return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
        }
        
        private boolean isLocalServerEnabled(final String path, final String data) {
            return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(data);
        }
    }

    //-------------------------------------------------------------

    /**
     * 当前job节点失效, 并且是主节点删除${namespace}/${job-name}/leander/election/instace 主节点
     */
    class LeaderAbdicationJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
                leaderService.removeLeader();
            }
        }
        
        private boolean isLocalServerDisabled(final String path, final String data) {
            return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
        }
    }
  • 分片相关: ShardingListenerManager.start()
 /**
     * 重新分片标识
     */
    class ShardingTotalCountChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            // config节点变化
            // 总分片数变化
            if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
                if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                    // 重新分片
                    shardingService.setReshardingFlag();
                    JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
                }
            }
        }
    }

    //-------------------------------------------------------------

    class ListenServersChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            // 作业正在运行
            // 作业节点server变化 || 作业实例insatnces变化
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                // 需要重新分片
                shardingService.setReshardingFlag();
            }
        }
        
        private boolean isInstanceChange(final Type eventType, final String path) {
            return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
        }
        
        private boolean isServerChange(final String path) {
            return serverNode.isServerPath(path);
        }
    }
  • 失效转移: FailoverListenerManager.start()
class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            // 开启失败转移
            // 删除类型
            // ${namespace}/${job-name}/instances 作业实例无效
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
                // 获得无效作业实例id
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                // 相同就算了
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                // 获得无效实例的需要执行的失效转移分片
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        // 添加leader下
                        failoverService.setCrashedFailoverFlag(each);
                        // 设置当前实例执行失效转移分片
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }
    
    class FailoverSettingsChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            // 不需要进行失效转移了
            // 删除${namespace}/${job-name}/sharding/${item}/failover节点
            if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !LiteJobConfigurationGsonFactory.fromJson(data).isFailover()) {
                failoverService.removeFailoverInfo();
            }
        }
    }
  • 其他不做介绍了

4.2 失效转移

运行中的某一作业节点崩溃,崩溃节点的分片会被分配到其他存活作业节点执行。执行崩溃节点分片,不会重新分片,只会在下次作业重新启动时重新分片。
  • job-node1、job-node2、job-node3, 9个分片
  1. job-node1失效, job-node2、job-node3的JobCrashedJobListener 监听到
  2. job-node2或job-node3会优先添加job-node1的分片到${namespace}/${job-name}/leader/failover/${item},不存在在添加${namespace}/${job-name}/sharding下面的分片
  • 因为只有在节点执行完${namespace}/${job-name}/sharding下面的分片, 才会添加${namespace}/${job-name}/leader/failover/${item}失效分片
   public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }
    
    private boolean needFailover() {
               // 失效后failoverService.setCrashedFailoverFlag(each)添加了失效分片
        return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                // 当前作业运行完${namespace}/${job-name}/sharding下面的分片
                && !JobRegistry.getInstance().isJobRunning(jobName);
    }
  1. leader(相当于分布式锁), 领取一个失效分片执行
  • FailoverLeaderExecutionCallback.execute
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
            
  1. job-node2和job-node3分别领取一个失败转移的分片, 还会剩下一个
  2. 等待下次重新启动任务, 3.4获得剩余分片执行
jobFacade.failoverIfNecessary();
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,482评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,377评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,762评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,273评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,289评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,046评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,351评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,988评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,476评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,948评论 2 324
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,064评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,712评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,261评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,264评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,486评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,511评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,802评论 2 345

推荐阅读更多精彩内容