关于Elasticsearch的选举机制:
ES选举master机制不像Hbase的HMaster选举, HMaster选举是借助ZK,通过各个节点向ZK注册临时节点(ZK保证只有一个节点能够注册成功, 此节点就是master节点),其余节点加入备节点,而且会监测ZNODE是否消失,消失的时候,备节点会争相向ZK注册临时节点进而选出新的master。
Elasticsearch选举master的时候, 当加入一个节点, 如果之前的Elasticsearch集群已经正常的在运行, 那么此时这个节点的加入会选择接受之前的master, 然后自己连接master并加入这个master构成的集群。如果是整个master集群刚开始初始启动的时候,这时候情况就会不同,就会出现选举master的过程。 这时候的选举可能选到了自己作为master, 也有可能是接受其他节点的master。
代码流程图如下所以:
其代码主要是ZenDiscovery这个类,在它的doStart方法中,
protected void doStart() {
nodesFD.setLocalNode(clusterService.localNode());
joinThreadControl.start();
pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, routingService, discoverySettings, settings);
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
clusterService.submitStateUpdateTask("initial_join", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
joinThreadControl.startNewThreadIfNotRunning();
return currentState;
}
@Override
public void onFailure(String source, @org.elasticsearch.common.Nullable Throwable t) {
logger.warn("failed to start initial join process", t);
}
});
}
public void startNewThreadIfNotRunning() {
assertClusterStateThread();
if (joinThreadActive()) {
return;
}
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
Thread currentThread = Thread.currentThread();
if (!currentJoinThread.compareAndSet(null, currentThread)) {
return;
}
while (running.get() && joinThreadActive(currentThread)) {
try {
innerJoinCluster();
return;
} catch (Exception e) {
logger.error("unexpected error while joining cluster, trying again", e);
// Because we catch any exception here, we want to know in
// tests if an uncaught exception got to this point and the test infra uncaught exception
// leak detection can catch this. In practise no uncaught exception should leak
assert ExceptionsHelper.reThrowIfNotNull(e);
}
}
// cleaning the current thread from currentJoinThread is done by explicit calls.
}
});
}
最终会调用 innerJoinCluster();函数
innerJoinCluster函数中,最主要的一部分代码就是
// 一直阻塞直到找到master节点,在集群刚刚启动,或者集群master丢失的情况,这种阻塞能够保证集群一致性
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster(); // 找到Master, 可能是自己也可能不是自己
}
findMaster的代码逻辑是:
private DiscoveryNode findMaster() {
logger.trace("starting to ping");
// 通过ping 其他节点来判定本节点能够连接上的节点的个数
ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout);
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("full ping responses:");
if (fullPingResponses.length == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace(sb.toString());
}
// filter responses
// 过滤PingResponse, 排除掉client节点,单纯的data节点
List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
DiscoveryNode node = pingResponse.node();
if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
// filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
} else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
// filter out data node that is not also master
} else {
pingResponses.add(pingResponse);
}
}
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("filtered ping responses: (filter_client[").append(masterElectionFilterClientNodes).append("], filter_data[").append(masterElectionFilterDataNodes).append("])");
if (pingResponses.isEmpty()) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : pingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.debug(sb.toString());
}
final DiscoveryNode localNode = clusterService.localNode();
List<DiscoveryNode> pingMasters = new ArrayList<>();
//获取所有ping响应中的master节点,如果master节点是节点本身则过滤掉。
// 要么是同一个节点(出现不同节点则集群出现了问题不过没关系,后面会进行选举)
// 正常情况下, pingMasters只有一个值
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.master() != null) {
// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
if (!localNode.equals(pingResponse.master())) {
pingMasters.add(pingResponse.master());
}
}
}
// nodes discovered during pinging
Set<DiscoveryNode> activeNodes = Sets.newHashSet();
// nodes discovered who has previously been part of the cluster and do not ping for the very first time
Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
//本节点暂时是master也要加入候选节点进行选举
if (localNode.masterNode()) { // 本节点被人选举为master
activeNodes.add(localNode);
long joinsCounter = clusterJoinsCounter.get();
if (joinsCounter > 0) {
logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
joinedOnceActiveNodes.add(localNode);
}
}
for (ZenPing.PingResponse pingResponse : pingResponses) {
activeNodes.add(pingResponse.node());
if (pingResponse.hasJoinedOnce()) {
joinedOnceActiveNodes.add(pingResponse.node());
}
}
//pingMasters为空,则本节点是master节点,
if (pingMasters.isEmpty()) { // pingMasters时空有两种情况,一种本地节点就是master节点
// 保证选举数量,说明有足够多的节点选举本节点为master,但是这还不够,本节点还需要再选举一次,如果本次选举节点仍旧是自己,那么本节点才能成为master。
if (electMaster.hasEnoughMasterNodes(activeNodes)) { // 判断是否包含足够的节点数,是否大于n/2 + 1
// we give preference to nodes who have previously already joined the cluster. Those will
// have a cluster state in memory, including an up to date routing table (which is not persistent to disk
// by the gateway)
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); // 既然本节点已经被ping同的节点选为了master, 也要自己选择自己一把,才能成为master, 不然的话对activeNodes进行重新选举
if (master != null) {
return master;
}
return electMaster.electMaster(activeNodes); // 重新选举
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.trace("not enough master nodes [{}]", activeNodes);
return null;
}
} else {
//pingMasters不为空(pingMasters列表中应该都是同一个节点),本节点没有被选举为master,那就接受之前的选举。
assert !pingMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// lets tie break between discovered nodes
return electMaster.electMaster(pingMasters);
}
}
这里我来使用场景来说明一下初始启动集群的场景, 正常集群加入新节点的过程这里就不再进行描述。
假设有三个节点node1, node2, node3, 假设我们配置每个节点都有机会(node.master: true)可以成为master,刚开始启动的时候, node1启动了,此时node1去执行findMaster(),由于此时只有一个节点, node1只能发现自己这个节点, 不满足节点数大于n/2+1的条件(配置文件指定的),所以此时找不到master, node1会不断的执行while循环直到找到master位置。
然后此时node2上线启动,node1和node2构成了两个节点,node2选择自己作为master节点, 此时node2 通过ping可以发现node1, 此时
if (localNode.masterNode()) { // 本节点被人选举为master
activeNodes.add(localNode);
long joinsCounter = clusterJoinsCounter.get();
if (joinsCounter > 0) {
logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
joinedOnceActiveNodes.add(localNode);
}
}
for (ZenPing.PingResponse pingResponse : pingResponses) {
activeNodes.add(pingResponse.node());
if (pingResponse.hasJoinedOnce()) {
joinedOnceActiveNodes.add(pingResponse.node());
}
}
可以知道activeNodes里面将会存放node1,node2,
joinedOnceActiveNodes存放的是node2
然后进行到此处:
if (pingMasters.isEmpty()) { // pingMasters时空有两种情况,一种本地节点就是master节点,另一种一开始初始启动,还没选出master,而本节点也没被选为master就可能出现空
// 保证选举数量,说明有足够多的节点选举本节点为master,但是这还不够,本节点还需要再选举一次,如果本次选举节点仍旧是自己,那么本节点才能成为master。
if (electMaster.hasEnoughMasterNodes(activeNodes)) { // 判断是否包含足够的节点数,是否大于n/2 + 1
// we give preference to nodes who have previously already joined the cluster. Those will
// have a cluster state in memory, including an up to date routing table (which is not persistent to disk
// by the gateway)
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); // 既然本节点已经被ping同的节点选为了master, 也要自己选择自己一把,才能成为master, 不然的话对activeNodes进行重新选举
if (master != null) {
return master;
}
return electMaster.electMaster(activeNodes); // 重新选举
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.trace("not enough master nodes [{}]", activeNodes);
return null;
}
}
此时node2还会选举自己一把DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);这样node2才能真正成为master, 不然的话会执行return electMaster.electMaster(activeNodes);也就是在node1和node2上重新选举。
此时node1 的循环又开始了,他就会发现master不是自己而是node2, 这样就会接受node2是master
然后第三个节点node3上线了, 此时他也会执行while循环中的findMaster方法, 发现集群中已经有一个正常的master, 这时候也是接受那个master, 并与之联系,加入集群。