Elasticsearch的选举机制

关于Elasticsearch的选举机制:
ES选举master机制不像Hbase的HMaster选举, HMaster选举是借助ZK,通过各个节点向ZK注册临时节点(ZK保证只有一个节点能够注册成功, 此节点就是master节点),其余节点加入备节点,而且会监测ZNODE是否消失,消失的时候,备节点会争相向ZK注册临时节点进而选出新的master。
Elasticsearch选举master的时候, 当加入一个节点, 如果之前的Elasticsearch集群已经正常的在运行, 那么此时这个节点的加入会选择接受之前的master, 然后自己连接master并加入这个master构成的集群。如果是整个master集群刚开始初始启动的时候,这时候情况就会不同,就会出现选举master的过程。 这时候的选举可能选到了自己作为master, 也有可能是接受其他节点的master。

代码流程图如下所以:


Elasticsearch Master选举机制

其代码主要是ZenDiscovery这个类,在它的doStart方法中,

protected void doStart() {
        nodesFD.setLocalNode(clusterService.localNode());
        joinThreadControl.start();
        pingService.start();
        this.nodeJoinController = new NodeJoinController(clusterService, routingService, discoverySettings, settings);

        // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
        clusterService.submitStateUpdateTask("initial_join", new ClusterStateNonMasterUpdateTask() {
            @Override
            public ClusterState execute(ClusterState currentState) throws Exception {
                // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
                joinThreadControl.startNewThreadIfNotRunning();
                return currentState;
            }

            @Override
            public void onFailure(String source, @org.elasticsearch.common.Nullable Throwable t) {
                logger.warn("failed to start initial join process", t);
            }
        });
    }

public void startNewThreadIfNotRunning() {
            assertClusterStateThread();
            if (joinThreadActive()) {
                return;
            }
            threadPool.generic().execute(new Runnable() {
                @Override
                public void run() {
                    Thread currentThread = Thread.currentThread();
                    if (!currentJoinThread.compareAndSet(null, currentThread)) {
                        return;
                    }
                    while (running.get() && joinThreadActive(currentThread)) {
                        try {
                            innerJoinCluster();
                            return;
                        } catch (Exception e) {
                            logger.error("unexpected error while joining cluster, trying again", e);
                            // Because we catch any exception here, we want to know in
                            // tests if an uncaught exception got to this point and the test infra uncaught exception
                            // leak detection can catch this. In practise no uncaught exception should leak
                            assert ExceptionsHelper.reThrowIfNotNull(e);
                        }
                    }
                    // cleaning the current thread from currentJoinThread is done by explicit calls.
                }
            });
        }

最终会调用 innerJoinCluster();函数
innerJoinCluster函数中,最主要的一部分代码就是

// 一直阻塞直到找到master节点,在集群刚刚启动,或者集群master丢失的情况,这种阻塞能够保证集群一致性
        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
            masterNode = findMaster(); // 找到Master, 可能是自己也可能不是自己
        }

findMaster的代码逻辑是:

private DiscoveryNode findMaster() {
        logger.trace("starting to ping");
        // 通过ping 其他节点来判定本节点能够连接上的节点的个数
        ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout);
        if (fullPingResponses == null) {
            logger.trace("No full ping responses");
            return null;
        }
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder("full ping responses:");
            if (fullPingResponses.length == 0) {
                sb.append(" {none}");
            } else {
                for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                    sb.append("\n\t--> ").append(pingResponse);
                }
            }
            logger.trace(sb.toString());
        }

        // filter responses
        // 过滤PingResponse, 排除掉client节点,单纯的data节点
        List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : fullPingResponses) {
            DiscoveryNode node = pingResponse.node();
            if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
                // filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
            } else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
                // filter out data node that is not also master
            } else {
                pingResponses.add(pingResponse);
            }
        }

        if (logger.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder("filtered ping responses: (filter_client[").append(masterElectionFilterClientNodes).append("], filter_data[").append(masterElectionFilterDataNodes).append("])");
            if (pingResponses.isEmpty()) {
                sb.append(" {none}");
            } else {
                for (ZenPing.PingResponse pingResponse : pingResponses) {
                    sb.append("\n\t--> ").append(pingResponse);
                }
            }
            logger.debug(sb.toString());
        }

        final DiscoveryNode localNode = clusterService.localNode();
        List<DiscoveryNode> pingMasters = new ArrayList<>();
        //获取所有ping响应中的master节点,如果master节点是节点本身则过滤掉。
        // 要么是同一个节点(出现不同节点则集群出现了问题不过没关系,后面会进行选举)
        // 正常情况下, pingMasters只有一个值
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            if (pingResponse.master() != null) {
                // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
                // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
                if (!localNode.equals(pingResponse.master())) {
                    pingMasters.add(pingResponse.master());
                }
            }
        }

        // nodes discovered during pinging
        Set<DiscoveryNode> activeNodes = Sets.newHashSet();
        // nodes discovered who has previously been part of the cluster and do not ping for the very first time
        Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
       //本节点暂时是master也要加入候选节点进行选举
        if (localNode.masterNode()) { // 本节点被人选举为master
            activeNodes.add(localNode);
            long joinsCounter = clusterJoinsCounter.get();
            if (joinsCounter > 0) {
                logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
                joinedOnceActiveNodes.add(localNode);
            }
        }

        for (ZenPing.PingResponse pingResponse : pingResponses) {
            activeNodes.add(pingResponse.node());
            if (pingResponse.hasJoinedOnce()) {
                joinedOnceActiveNodes.add(pingResponse.node());
            }
        }
        //pingMasters为空,则本节点是master节点,
        if (pingMasters.isEmpty()) { // pingMasters时空有两种情况,一种本地节点就是master节点
            // 保证选举数量,说明有足够多的节点选举本节点为master,但是这还不够,本节点还需要再选举一次,如果本次选举节点仍旧是自己,那么本节点才能成为master。
            if (electMaster.hasEnoughMasterNodes(activeNodes)) {   // 判断是否包含足够的节点数,是否大于n/2 + 1
                // we give preference to nodes who have previously already joined the cluster. Those will
                // have a cluster state in memory, including an up to date routing table (which is not persistent to disk
                // by the gateway)
                DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); // 既然本节点已经被ping同的节点选为了master, 也要自己选择自己一把,才能成为master, 不然的话对activeNodes进行重新选举
                if (master != null) {
                    return master;
                }
                return electMaster.electMaster(activeNodes); // 重新选举
            } else {
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                logger.trace("not enough master nodes [{}]", activeNodes);
                return null;
            }
        } else {
            //pingMasters不为空(pingMasters列表中应该都是同一个节点),本节点没有被选举为master,那就接受之前的选举。
            assert !pingMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
            // lets tie break between discovered nodes
            return electMaster.electMaster(pingMasters);
        }
    }

这里我来使用场景来说明一下初始启动集群的场景, 正常集群加入新节点的过程这里就不再进行描述。
假设有三个节点node1, node2, node3, 假设我们配置每个节点都有机会(node.master: true)可以成为master,刚开始启动的时候, node1启动了,此时node1去执行findMaster(),由于此时只有一个节点, node1只能发现自己这个节点, 不满足节点数大于n/2+1的条件(配置文件指定的),所以此时找不到master, node1会不断的执行while循环直到找到master位置。
然后此时node2上线启动,node1和node2构成了两个节点,node2选择自己作为master节点, 此时node2 通过ping可以发现node1, 此时

if (localNode.masterNode()) { // 本节点被人选举为master
            activeNodes.add(localNode);
            long joinsCounter = clusterJoinsCounter.get();
            if (joinsCounter > 0) {
                logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
                joinedOnceActiveNodes.add(localNode);
            }
        }

        for (ZenPing.PingResponse pingResponse : pingResponses) {
            activeNodes.add(pingResponse.node());
            if (pingResponse.hasJoinedOnce()) {
                joinedOnceActiveNodes.add(pingResponse.node());
            }
        }

可以知道activeNodes里面将会存放node1,node2,
joinedOnceActiveNodes存放的是node2
然后进行到此处:

if (pingMasters.isEmpty()) { // pingMasters时空有两种情况,一种本地节点就是master节点,另一种一开始初始启动,还没选出master,而本节点也没被选为master就可能出现空
            // 保证选举数量,说明有足够多的节点选举本节点为master,但是这还不够,本节点还需要再选举一次,如果本次选举节点仍旧是自己,那么本节点才能成为master。
            if (electMaster.hasEnoughMasterNodes(activeNodes)) {   // 判断是否包含足够的节点数,是否大于n/2 + 1
                // we give preference to nodes who have previously already joined the cluster. Those will
                // have a cluster state in memory, including an up to date routing table (which is not persistent to disk
                // by the gateway)
                DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); // 既然本节点已经被ping同的节点选为了master, 也要自己选择自己一把,才能成为master, 不然的话对activeNodes进行重新选举
                if (master != null) {
                    return master;
                }
                return electMaster.electMaster(activeNodes); // 重新选举
            } else {
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                logger.trace("not enough master nodes [{}]", activeNodes);
                return null;
            }
        } 

此时node2还会选举自己一把DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);这样node2才能真正成为master, 不然的话会执行return electMaster.electMaster(activeNodes);也就是在node1和node2上重新选举。
此时node1 的循环又开始了,他就会发现master不是自己而是node2, 这样就会接受node2是master

然后第三个节点node3上线了, 此时他也会执行while循环中的findMaster方法, 发现集群中已经有一个正常的master, 这时候也是接受那个master, 并与之联系,加入集群。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容