elastic-job-lite源码分析之选举及分片

为什么进行选举

因为我们的任务支持分片,分片的逻辑不需要重复执行多次,由于是去中心化设计,所以我们需要通过选举的形式选出一台服务器作为主节点,执行分片逻辑。

zookeeper节点

下面的zookeeper会在选举以及分片中用到,因此提前列出来

节点 节点类型 节点作用
/${namespace}/${jobName}/leader/election/latch 持久 用于选举的锁
/${namespace}/${jobName}/leader/election/instance 临时 保存主节点地址
/${namespace}/${jobName}/leader/sharding/necessary 持久 判断是否需要重新分片
/${namespace}/${jobName}/leader/sharding/processing 临时 标记主节点正在sharding 的标志
/${namespace}/${jobName}/sharing/${shardingItem}/instance 永久 保存拿到shardingItem分片的作业实例

${...}表示可变的配置

临时节点会在创建节点服务器断开连接时删除

选举

选举的逻辑在LeaderService的electLeader方法。主节点的选举采用先到先得的策略,其他节点会在当前节点选举的时候阻塞。

 /**
 * 选举主节点.
 */
public void electLeader() {
    log.debug("Elect a new leader now.");
    jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
    log.debug("Leader election completed.");
}

LeaderNode.LATCH用于做节点选举
LeaderElectionExecutionCallback回调会在拿到leader锁的节点触发
主节点的选举使用了Curator框架的LeaderLatch类

/**
 * 在主节点执行操作.
 * 
 * @param latchNode 分布式锁使用的作业节点名称
 * @param callback 执行操作的回调
 */
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
    //注意到这边try语句用法,结束后会自动释放锁
    try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
        latch.start();
        //这边会等待锁的释放,也就是callback会执行多次
        latch.await();
        callback.execute();
    //CHECKSTYLE:OFF
    } catch (final Exception ex) {
    //CHECKSTYLE:ON
        handleException(ex);
    }
}

但是这里LeaderLatch的用法更多是为了保证有序性,因为拿到leader锁的服务器在执行完之后会释放锁,其他await的节点也会相继拿到leader锁,执行callback逻辑。
callback的具体实现中又使用了乐观锁,判断leaderNode.INSTANCE节点是否被设置,显然,第一个拿到leader锁的服务器已经设置,成为主节点。其他服务器不执行逻辑。

在我的elasticjob-springboot-demo项目中对LeaderLatch进行了测试

@RequiredArgsConstructor
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
    
    @Override
    public void execute() {
        //判断eaderNode.INSTANCE是否已经设置,相当于乐观锁
        if (!hasLeader()) {
            //设置leaderNode.INSTANCE节点的内容为当前服务器
            jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        }
    }
}

注意Instance节点为临时节点,也就是主节点下线后,这个节点会消失。这个对于触发主节点重新选举有关。

Curator的LeaderLatch的leader和Elastic Job主节点是两个完全不同的概念。LeaderLatch更多了是为了保证只有一个节点能持有某个目录的锁。Elastic Job使用LeaderLatch实现了抢占式选主。

分片

分片的逻辑封装在ShardingService

设置分片状态

在我们Job初始化的时候,会调用setReshardingFlag方法来设置重新分片标记,当job第一次执行的时候,发现重新分片状态存在,那么会先等待主节点分片完成,在获取自己对应的分片执行job逻辑

public void setReshardingFlag() {
    jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}

分片逻辑

shardingIfNecessary方法封装了分片逻辑

/**
 * 如果需要分片且当前节点为主节点, 则作业分片.
 * 
 * <p>
 * 如果当前无可用节点则不分片.
 * </p>
 */
public void shardingIfNecessary() {
    //获取可分片的作业运行实例
    List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
    //判断是否需要进行分片
    if (!isNeedSharding() || availableJobInstances.isEmpty()) {
        return;
    }
    //判断是否是主节点,不是主节点进入if逻辑,同时内部会等待主节点选举完成
    if (!leaderService.isLeaderUntilBlock()) {
        //等待主节点分片完成
        blockUntilShardingCompleted();
        return;
    }
    //等待当前任务的其他分片运行结束
    waitingOtherShardingItemCompleted();
    LiteJobConfiguration liteJobConfig = configService.load(false);
    int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
    log.debug("Job '{}' sharding begin.", jobName);
    //设置分片运行标志
    jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
    //清空之前的sharding节点
    resetShardingInfo(shardingTotalCount);
    //获取配置的分片策略类,不存在使用默认的AverageAllocationJobShardingStrategy
    JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
    //执行分片逻辑
    jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
    log.debug("Job '{}' sharding complete.", jobName);
}

上述逻辑流程图如下


这个方法会在LiteJobFacade的getShardingContexts中被调用到,用来生成调用上下文,而getShardingContexts会被AbstractElasticJobExecutor的execute调用,excute最终调用到我们配置job的execute方法

因此分片逻辑会在执行job的时候触发,第一次是必定触发,之后触发看重新分片标志是否被重新设置

我们配置的Job是如何执行的,之后会单独讲解

分片策略

下面来看下分片的具体逻辑,从上面可以看到具体分片逻辑由JobShardingStrategy子类实现,先看下它的接口定义

public interface JobShardingStrategy {
    
    /**
     * 作业分片.
     * 
     * @param jobInstances 所有参与分片的单元列表
     * @param jobName 作业名称
     * @param shardingTotalCount 分片总数
     * @return 分片结果
     */
    Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}

主要是根据shardingTotalCount和jobInstances进行分片分配
看下提供的默认实现类AverageAllocationJobShardingStrategy

public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
        addAliquant(jobInstances, shardingTotalCount, result);
        return result;
    }
    
    private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
        //分片数/作业实例数,得到每个作业实例最少得到的分片数
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            //给每个作业实例分配最少得到的分片数
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(i);
            }
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }
    
    private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
        //分片数%作业实例数,得到多余的分片数
        int aliquant = shardingTotalCount % shardingUnits.size();
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            //把多余的分片数,按顺序分给作业服务器
            if (count < aliquant) {
                entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
            }
            count++;
        }
    }
}

这个算法的实例如下

* 基于平均分配算法的分片策略.
 * 
 * <p>
 * 如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器.
 * 如: 
 * 1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
 * 2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
 * 3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
 * </p>

至于其他实现类,OdevitySortByNameJobShardingStrategy和RotateServerByNameJobShardingStrategy都是基于AverageAllocationJobShardingStrategy实现的,只不过修改了List<JobInstance> shardingUnits里面instance的顺序

设置分片结果

通过JobShardingStrategy得到分片结果之后,会通过PersistShardingInfoTransactionExecutionCallback将结果持久化到zookeeper

@RequiredArgsConstructor
    class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {
        
    private final Map<JobInstance, List<Integer>> shardingResults;
    
    @Override
    public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            for (int shardingItem : entry.getValue()) {
                //设置节点/namespace/jobName/sharding/shardingItem/instance = jobinstanceid(相当于ip)
                curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
            }
        }
        //清除分片标记
        curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();
        //清除分片进行中标记
        curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();
    }
}

在/namespace/jobName/sharding下面的子节点会把分片结果保存起来。建立与分片数字对应节点,节点instance的data内容为该分片持有的服务器。

例如


重新选举/分片

在主节点下线的情况下,我们需要重新进行选举或者我们在控制台修改了分片配置需要重新触发分片,这个是如何实现的

基于zookeeper的watch特性,我们可以对zookeeper的节点进行监听,如果节点或者子节点发生了改变,就会触发回调。重新选举/分片就是基于这个实现的

重新选举

主节点重新选举由ElectionListenerManager中的监听器LeaderElectionJobListener实现,关键逻辑如下

class LeaderElectionJobListener extends AbstractJobListener {
        
    @Override
    protected void dataChanged(final String path, final Type eventType, final String data) {
        if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
            //触发选举
            leaderService.electLeader();
        }
    }
    
    //判断是否不存在主节点并且本机有效
    private boolean isActiveElection(final String path, final String data) {
        return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
    }
    
    //判断主节点是否down机并且当前主机可用
    private boolean isPassiveElection(final String path, final Type eventType) {
        return isLeaderCrashed(path, eventType) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp());
    }
    
    //判断当前回调触发的原因是主节点down机了
    private boolean isLeaderCrashed(final String path, final Type eventType) {
        return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
    }
    
    //判断当前服务器对应节点是否可用
    private boolean isLocalServerEnabled(final String path, final String data) {
        return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(data);
    }
}

这个监听器触发重新选举的条件,首先是当前节点对应job不能停止,其次判断是否当前没有主节点或者当前回调触发的原因就是因为主节点下线。
如果条件满足,就之前讲的一样,通过 leaderService.electLeader()竞争主节点

重新分片标记

设置重新分片标记由ShardingListenerManager的监听器ShardingTotalCountChangedJobListener和ListenServersChangedJobListener执行,主要逻辑是设置重新分片标记。实际的分片操作会在下一次job任务执行前触发,并且由主节点进行分片。

ShardingTotalCountChangedJobListener在分片数配置发生改变时触发

class ShardingTotalCountChangedJobListener extends AbstractJobListener {

    @Override
    protected void dataChanged(final String path, final Type eventType, final String data) {
        //判断是否是由config节点改变触发,并且当前本地缓存的配置分片大小不为0
        if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
            int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
            //如果分片大小发生改变
            if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                //设置分片标记
                shardingService.setReshardingFlag();
                //更新本地缓存
                JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
            }
        }
    }
}

ListenServersChangedJobListener在作业实例发生改变时触发

class ListenServersChangedJobListener extends AbstractJobListener {
        
    @Override
    protected void dataChanged(final String path, final Type eventType, final String data) {
        //当前作业节点没有停止的前提下,作业服务器实例或者作业实例发生变化时,设置分片标记
        if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
            shardingService.setReshardingFlag();
        }
    }
    
    //instance节点发生改变
    private boolean isInstanceChange(final Type eventType, final String path) {
        return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
    }
    
    //server节点发生改变
    private boolean isServerChange(final String path) {
        return serverNode.isServerPath(path);
    }
}

我们可以看到,这里不会直接进行分片,而是设置了分片标记,分片操作延迟到任务执行前触发,这其实算一种优化吧,在任务执行前,任务的配置可能会发生多次,每次都执行分片逻辑,并且都需要控制在主节点进行,很浪费性能。这个设计略巧妙。

思考

在想一个问题,如果分片策略是AverageAllocationJobShardingStrategy,我们的应用一般每个job都会配置,那么如果在分片很少的情况下,如果每个job的instance节点顺序一致(zookeeper应该存在默认的排序规则),那么后面的服务器根本轮不到执行任务。所以配置分片策略的时候,不要使用默认的,使用后面2个和hash有关的,或者自己自定义策略。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,482评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,377评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,762评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,273评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,289评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,046评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,351评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,988评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,476评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,948评论 2 324
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,064评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,712评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,261评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,264评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,486评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,511评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,802评论 2 345

推荐阅读更多精彩内容