DLedger源码深度剖析(一):探秘选举流程

概述

DLedger是一个分布式日志复制系统,基于Raft共识算法,实现了分布式环境中的一致性和领导者选举。选举的核心在于如何高效地选出领导者,由其负责日志的写入和同步操作,确保集群的一致性和高可用性。本文将深入剖析DLedger的选举流程,详细解析投票机制如何选定领导者,以及领导者如何通过心跳机制维持其角色和确保集群的稳定运行。

Raft选举机制

在讲解 DLedger 的选举过程之前,先来简要介绍一下 Raft 算法的选举机制,以便更好地理解 DLedger 在分布式一致性中的实现。

Raft算法的节点可以有三种角色:

  • Leader(领导者):负责处理客户端请求并将日志条目复制到其他节点。
  • Follower(跟随者):被动接受Leader的日志复制或选举请求。
  • Candidate(候选者):在选举过程中发起投票以争取成为Leader。

Raft的选举机制是确保集群中的某一个节点成为Leader。它的选举过程包括以下步骤:

  1. 心跳超时:每个Follower节点都会等待一个随机的超时时间,如果在此期间未收到来自Leader的心跳信号,Follower会转变为Candidate。
  2. 发起投票:Candidate会增加其任期号并向其他节点发送投票请求。每个节点只能投一票,且票只能投给任期号不小于自身任期号的节点。
  3. 胜出:如果Candidate获得超过半数的投票支持,它将成为Leader,并开始向其他节点发送心跳信号,维持领导地位。
  4. 平局处理:如果没有节点在超时时间内赢得足够票数,所有节点会再次超时并重新发起选举,直至选举出一个新的Leader。
Raft选举机制

DLedger选举

DLedger正是基于Raft协议进行扩展和优化的,在了解了Raft的选举原理后,我们接下来深入探讨DLedger的选举过程,看看 DLedger 的选举实现。

DLedger选举流程

DLedgerLeaderElector 负责 DLedger 集群中节点之间的选举和角色管理。DLedger 服务启动时,会调用DLedgerLeaderElectorstart() 方法,start()方法内部会调用stateMaintainer.start()方法启动状态机:

#DLedgerLeaderElector
public void startup() {
    // 启动状态机
    stateMaintainer.start();
    // ...
}

StateMaintainer 作为 DLedgerLeaderElector 的内部类,继承自 ShutdownAbleThread,使得它能够利用 ShutdownAbleThread 提供的关闭功能。而ShutdownAbleThread 直接继承自 Java 的 Thread 类,在其 run() 方法内部,ShutdownAbleThread 定义了一个骨架方法:doWork(),这个方法可以被子类(例如 StateMaintainer)重写,以实现特定的线程任务:

#ShutdownAbleThread
public void run() {
    // 循环调用doWork()方法
    while (running.get()) {
        try {
            doWork();
        } catch (Throwable t) {
            if (logger != null) {
                logger.error("Unexpected Error in running {} ", getName(), t);
            }
        }
    }
    latch.countDown();
}

线程内部会循环调用 DLedgerLeaderElector 类的doWork() 方法:

#DLedgerLeaderElector
public void doWork() {
    try {
        if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) {
            DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig);
            // 选举的核心逻辑,负责根据节点的角色调用相应的逻辑
            DLedgerLeaderElector.this.maintainState();
        }
        // 休眠10ms
        sleep(10);
    } catch (Throwable t) {
        DLedgerLeaderElector.LOGGER.error("Error in heartbeat", t);
    }
}

maintainState() 方法是选举的核心方法,负责根据节点的角色(领导者、跟随者或候选者)调用相应的状态维护逻辑:

private void maintainState() throws Exception {
    if (memberState.isLeader()) {
        // 定时向跟随者发送心跳,维护领导者状态
        maintainAsLeader();
    } else if (memberState.isFollower()) {
        // 检测心跳是否超时,超时后将角色从跟随者变为候选者,准备触发新的选举
        maintainAsFollower();
    } else {
        // 发起选举请求、收集投票
        maintainAsCandidate();
    }
}

1.1 发起投票请求

MemberState 是 DLedger 服务中用于管理节点状态的关键组件。它主要负责跟踪一个节点的角色、任期(Term)、领导者信息、日志相关信息等。在 DLedger 服务初始化时会实例化MemberStateMemberState 初始角色为CANDIDATE,处于该角色的节点会发起投票,核心逻辑在maintainAsCandidate()方法中。

候选者投票流程

maintainAsCandidate() 方法主要负责在节点处于候选者(Candidate)状态时执行选举相关的逻辑操作。为了使逻辑更清晰,可以将其分为以下几个关键步骤:前置验证请求参数的准备发起投票以及投票结果的处理

前置验证

在进入选举流程之前,方法首先会进行前置验证,确保当前节点的状态和任期是符合发起选举的要求。这些验证通常包括检查当前节点的状态是否为候选者状态,确认是否符合选举的时间要求,避免无效选举的频繁触发。常见的验证逻辑可能包括:

  • 是否符合发起投票的时间要求;
  • 是否需要立即增加任期号;
  • 当前节点的状态是否是候选者。
// 如果当前时间小于下次发起投票时间并且不需要立即提升任期号,则不发起投票
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
    return;
}

// 如果当前节点的状态不是候选者,则不发起投票
if (!memberState.isCandidate()) {
    return;
}

// 如果需要立即提升任期号,则只需要提升当前任期号,等待下次发起投票
if (needIncreaseTermImmediately) {
    nextTimeToRequestVote = getNextTimeToRequestVote();
    needIncreaseTermImmediately = false;
    return;
}

请求参数准备

在通过了前置验证之后,方法会为发起选举准备所需的参数。主要包括:

  • 当前任期(Term):记录节点当前的选举轮次;
  • 日志索引(LedgerEndIndex):当前节点日志中的最后一条日志的索引;
  • 日志任期(LedgerEndTerm):最后一条日志的任期,用于选举时的日志比较;
  • 候选者节点信息:候选者的 ID、当前的节点信息等,用于向其他节点发出投票请求。
// lastParseResult:上次投票结果
// 如果上次投票结果是等待下次投票 或者需要立即提升任期号,则需要提升当前任期号,并更新上次投票结果为等待投票
// 否则当前任期号不需要变化
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
    long prevTerm = memberState.currTerm();
    term = memberState.nextTerm();
    LOGGER.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);
    lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
} else {
    term = memberState.currTerm();
}
ledgerEndIndex = memberState.getLedgerEndIndex();
ledgerEndTerm = memberState.getLedgerEndTerm();

发起投票

准备好请求参数后,节点会向集群中的其他节点发起投票请求。在这个步骤中,候选者节点将通过网络发送选举请求,告知其他节点其竞选为 Leader 的意图。每个节点在接收到投票请求后,会根据候选者的日志情况和当前任期决定是否投票给它。关键逻辑包括:

  • 向所有其他节点广播投票请求;
  • 等待节点投票的响应;
  • 收集投票结果,判断是否达到了选举成功的条件(通常需要超过半数的投票支持)。
// 发起投票
List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term, long ledgerEndTerm,
                                                                     long ledgerEndIndex) throws Exception {
    List<CompletableFuture<VoteResponse>> responses = new ArrayList<>();
    // 向集群中所有节点发起投票请求,包括为自己投票
    for (String id : memberState.getPeerMap().keySet()) {
        VoteRequest voteRequest = new VoteRequest();
        // 省略其他请求参数...
        CompletableFuture<VoteResponse> voteResponse;
        // 如果是为自己投票,则投票请求无需通过网络发送,因为它是本地操作,直接处理即可。
        // 如果是向其他节点发起投票请求,则需要通过RPC发送投票请求
        if (memberState.getSelfId().equals(id)) {
            voteResponse = handleVote(voteRequest, true);
        } else {
            //async
            voteResponse = dLedgerRpcService.vote(voteRequest);
        }
        responses.add(voteResponse);

    }
    return responses;
}

1.2 投票请求处理

集群中的每个节点接收到投票请求时,会根据请求中的参数和自身的状态来处理投票请求,所有的投票请求都会调用同一个核心方法 handleVote 来处理。

#DLedgerLeaderElector
public CompletableFuture<VoteResponse> handleVote(VoteRequest request, boolean self) {
    synchronized (memberState) {
        // 检查发起投票请求的 leaderId 是否是集群中的已知节点
        if (!memberState.isPeerMember(request.getLeaderId())) {
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNKNOWN_LEADER));
        }
        // 如果发起投票请求的节点是当前节点本身,但是发起者不是自己则拒绝投票,防止投票逻辑混乱
        if (!self && memberState.getSelfId().equals(request.getLeaderId())) {
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER));
        }
                // 比较日志的结束索引和任期,确保候选者拥有最新的日志。如果候选者的日志滞后,将拒绝投票。
        if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) {
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));
        } else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) {
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX));
        }
        // 根据请求的任期决定是接受、拒绝,还是更改自身的角色。若任期小于当前节点任期,则拒绝投票;
        // 若任期相等,则检查当前节点是否已经投票;若任期更大,则更新自身为候选者,准备进入下一轮选举。
        if (request.getTerm() < memberState.currTerm()) {
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
        } else if (request.getTerm() == memberState.currTerm()) {
            if (memberState.currVoteFor() == null) {
                //let it go
            } else if (memberState.currVoteFor().equals(request.getLeaderId())) {
                //repeat just let it go
            } else {
                if (memberState.getLeaderId() != null) {
                    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_HAS_LEADER));
                } else {
                    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
                }
            }
        } else {
            //stepped down by larger term
            changeRoleToCandidate(request.getTerm());
            needIncreaseTermImmediately = true;
            //only can handleVote when the term is consistent
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));
        }
        if (request.getTerm() < memberState.getLedgerEndTerm()) {
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER));
        }
                // 如果当前节点正在进行领导权转移,则拒绝发起的投票
        if (!self && isTakingLeadership() && request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && memberState.getLedgerEndIndex() >= request.getLedgerEndIndex()) {
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TAKING_LEADERSHIP));
        }
        // 如果所有检查通过,当前节点将会为请求中的候选者投票,并更新当前投票的候选者
        memberState.setCurrVoteFor(request.getLeaderId());
        return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
    }
}

方法的核心逻辑包括以下几步:

  1. 成员合法性检查handleVote 会确认发起投票请求的 leaderId 是否为集群中的已知节点。如果该节点并非集群成员,则拒绝投票请求,确保只有集群内部成员才有权发起选举。
  2. 检查自身节点状态: 如果请求中的 leaderId 与当前节点相同,handleVote 会进行自检,以避免投票逻辑的混乱。
  3. 日志的比较: 请求中包含的日志终止索引 (ledgerEndIndex) 和任期 (ledgerEndTerm) 需要与当前节点的日志进行对比。如果候选者的日志比当前节点的日志更旧,则投票请求将被拒绝。
  4. 投票任期的比较: 当前节点会检查请求的任期是否过期或低于当前任期,如果当前节点的任期更大,投票请求将被拒绝。如果任期相同或更大,则根据进一步的状态检查决定是否投票。
  5. 处理重复投票: 当节点的任期与请求中的任期相同时,handleVote 会检查该节点是否已经为某个候选者投过票。如果已经投过票,或者集群中已经有合法的领导者,则该节点会拒绝再次投票,防止重复投票。
  6. 任期更新: 如果投票请求的任期大于当前节点的任期,handleVote 会将当前节点的状态更新为候选者,并且立即进入新一轮的选举过程。

1.3 投票结果处理

收到节点的投票结果后,候选者节点会对这些投票进行处理。如果候选者获得了超过半数节点的投票支持,它将转换为 Leader 角色,接管集群的控制权;如果没有获得足够的支持,节点将继续保持候选者状态,或者再次发起选举。具体处理步骤如下:

  • 统计投票结果,判断是否超过半数;
  • 如果选举成功,更新节点的状态为 Leader,开始执行 Leader 相关的职责;
  • 如果选举失败,节点可能需要等待一段时间后再次发起选举,或者降级为跟随者。
CountDownLatch voteLatch = new CountDownLatch(1);
for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
    future.whenComplete((VoteResponse x, Throwable ex) -> {
        try {
            if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
                validNum.incrementAndGet();
            }
            synchronized (knownMaxTermInGroup) {
                switch (x.getVoteResult()) {
                    // 节点接受投票请求,并投票支持当前候选者。
                    case ACCEPT:
                            // 记录接受投票的节点数量
                        acceptedNum.incrementAndGet();
                        break;
                    // 拒绝投票,节点已经有了领导者
                    case REJECT_ALREADY_HAS_LEADER:
                            // 记录是否有节点已经报告自己有领导者
                        alreadyHasLeader.compareAndSet(false, true);
                        break;
                    // 拒绝投票,请求的 term 小于本地日志的 term
                    case REJECT_TERM_SMALL_THAN_LEDGER:
                    // 拒绝投票,请求的投票 term 已过期
                    case REJECT_EXPIRED_VOTE_TERM:
                        if (x.getTerm() > knownMaxTermInGroup.get()) {
                            // 已知的最大 term
                            knownMaxTermInGroup.set(x.getTerm());
                        }
                        break;
                    // 拒绝投票,请求的日志 term 过期
                    case REJECT_EXPIRED_LEDGER_TERM:
                    // 拒绝投票,请求的日志 ledgerEndIndex 太小
                    case REJECT_SMALL_LEDGER_END_INDEX:
                            // 记录拒绝投票的节点中,ledgerEndIndex 比请求者大的节点数量
                        biggerLedgerNum.incrementAndGet();
                        break;
                    // 节点的 term 尚未准备好,不进行投票
                    case REJECT_TERM_NOT_READY:
                            // 尚未准备好的节点数量
                        notReadyTermNum.incrementAndGet();
                        break;
                    // 拒绝投票,已经为其他节点投过票
                    case REJECT_ALREADY_VOTED:
                    // 拒绝投票,节点正在接管领导权
                    case REJECT_TAKING_LEADERSHIP:
                    default:
                        break;
                }
            }
            // 检查是否有节点已经报告集群中存在有效的领导者,如果有则终止等待
            // 如果获得过半票数,则终止等待
            // 获得的票数+暂时无法投票的节点数量过半,则终止等待
            if (alreadyHasLeader.get()
                    || memberState.isQuorum(acceptedNum.get())
                    || memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
                voteLatch.countDown();
            }
        } catch (Throwable t) {
            LOGGER.error("vote response failed", t);
        } finally {
            // 所有节点的投票结果都已经收到,则终止等待
            allNum.incrementAndGet();
            if (allNum.get() == memberState.peerSize()) {
                voteLatch.countDown();
            }
        }
    });
}
try {
    // 在指定的时间内等待(2000 + random.nextInt(maxVoteIntervalMs)),直到投票结果达到某个条件,或者超时时间到期
    voteLatch.await(2000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
} catch (Throwable ignore) {

}

因为投票是异步进行的,通过 2000 + random.nextInt(maxVoteIntervalMs) 这种随机等待时间,每个候选者会在不同的时间点等待投票结果,从而减少频繁的选举冲突。

投票结果大致分为两种:

投票成功情况:当 alreadyHasLeader.get() 返回 true 或者达到了 isQuorum(acceptedNum) 时,立即终止投票等待,因为选举已经有了明确的结果。

无法决出领导者的情况:即使投票过程中没有立即决出领导者,一旦所有节点的投票结果都已收到,allNum.get() == memberState.peerSize() 条件满足后,也会终止等待。此时,后续逻辑可能会根据投票结果做出其他处理,比如发起新一轮选举。

VoteResponse.ParseResult parseResult;
// 当集群中的最大任期比当前候选者的任期大时,说明其他节点可能已经经历了新一轮的选举。此时,当前节点将更新为候选者,并等待下一轮投票。
if (knownMaxTermInGroup.get() > term) {
    parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
    nextTimeToRequestVote = getNextTimeToRequestVote();
    changeRoleToCandidate(knownMaxTermInGroup.get());
// 如果当前集群中已经有了领导者,那么当前候选者将重新等待投票,这避免了多个候选者同时竞争的混乱。
} else if (alreadyHasLeader.get()) {
    parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
    nextTimeToRequestVote = getNextTimeToRequestVote() + (long) heartBeatTimeIntervalMs * maxHeartBeatLeak;
// 未获得过半票数,等待下次重新投票
} else if (!memberState.isQuorum(validNum.get())) {
    parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
    nextTimeToRequestVote = getNextTimeToRequestVote();
// 如果投票时,部分节点的日志索引较小,未能同步最新的日志状态,导致投票无效,等待下次重新投票
} else if (!memberState.isQuorum(validNum.get() - biggerLedgerNum.get())) {
    parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
    nextTimeToRequestVote = getNextTimeToRequestVote() + maxVoteIntervalMs;
// 获得过半票数
} else if (memberState.isQuorum(acceptedNum.get())) {
    parseResult = VoteResponse.ParseResult.PASSED;
// 如果有部分节点还未准备好投票,但剩下的支持票数已接近法定人数,系统会立即发起新一轮投票以确认领导者
} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
    parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
// 等待下次投票
} else {
    parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
    nextTimeToRequestVote = getNextTimeToRequestVote();
}
lastParseResult = parseResult;
// 当前节点成功当选为领导者,改变当前节点的角色为领导者。
if (parseResult == VoteResponse.ParseResult.PASSED) {
    changeRoleToLeader(term);
}

选举过程中针对不同的投票结果,做出相应处理:

  1. 任期较小时等待下轮投票
    如果集群中的最大任期 knownMaxTermInGroup 大于当前候选者的任期,意味着其他节点可能已经进行过新一轮的选举。此时,当前节点会将自己转为候选者,并等待下一轮投票。
  2. 已有领导者时重新投票
    如果集群中已经有合法的领导者(alreadyHasLeader),则当前候选者不会继续参与竞争,而是重新等待投票,避免多节点同时争夺领导者的情况。
  3. 票数不足时重新投票
    当候选者未获得法定票数(validNum未达到法定人数),系统会选择等待重新发起投票。
  4. 日志索引不一致导致重新投票
    如果部分节点的日志索引较小(biggerLedgerNum),未同步到最新的日志状态,导致有效投票数不够,系统会再次等待投票。
  5. 获得过半票数时通过选举
    如果候选者已经获得过半数的选票(acceptedNum达到法定人数),则选举成功,候选者当选为领导者。
  6. 立即发起新一轮投票
    在部分节点未准备好投票(notReadyTermNum),但剩下的有效票数已经接近法定票数时,系统会立即发起新一轮投票,以快速确认领导者。
  7. 等待下次投票
    如果不满足其他条件,则继续等待下轮投票。
  8. 成功当选为领导者
    当选举通过(PASSED),系统会将当前节点角色变更为领导者,结束选举。

1.4 心跳机制

心跳流程

发起心跳请求

领导者会定期向跟随者发送心跳消息,以确保领导者的身份,主要逻辑:

  1. 检查自上次发送心跳以来的时间是否超过预定的心跳间隔 heartBeatTimeIntervalMs
  2. 如果当前节点不是领导者,则停止发送心跳。
  3. 在同步块中获取当前的任期 term 和领导者 ID leaderId,并更新上次发送心跳的时间。
  4. 调用 sendHeartbeats 方法向跟随者发送心跳消息。
#DLedgerLeaderElector
private void maintainAsLeader() throws Exception {
    // 上次心跳时间是否超过心跳间隔(2s)
    if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {
        long term;
        String leaderId;
        synchronized (memberState) {
            if (!memberState.isLeader()) {
                //stop sending
                return;
            }
            term = memberState.currTerm();
            leaderId = memberState.getLeaderId();
            lastSendHeartBeatTime = System.currentTimeMillis();
        }
        // 发起心跳
        sendHeartbeats(term, leaderId);
    }
}

sendHeartbeats 方法实现了领导者向跟随者发送心跳请求的逻辑,并根据响应结果做出相应的处理,核心逻辑如下:

  1. 初始化多个计数器和状态变量,准备发送心跳请求。
  2. 遍历所有成员,跳过自己,构建心跳请求并通过异步方式发送。
  3. 使用 CompletableFuture 处理响应,更新成功数量、最大任期、领导者一致性等状态。
  4. 使用 CountDownLatch 等待所有响应,处理异常情况。
  5. 根据响应结果和状态决定是否改变角色或更新上次成功心跳的时间。
#DLedgerLeaderElector
private void sendHeartbeats(long term, String leaderId) throws Exception {
    // 跟随者的总数
    final AtomicInteger allNum = new AtomicInteger(1);
    // 成功响应的跟随者数量
    final AtomicInteger succNum = new AtomicInteger(1);
    // 状态为“未准备好”的跟随者数量
    final AtomicInteger notReadyNum = new AtomicInteger(0);
    // 记录接收到的最大任期
    final AtomicLong maxTerm = new AtomicLong(-1);
    // 是否存在不一致的领导者
    final AtomicBoolean inconsistLeader = new AtomicBoolean(false);
    final CountDownLatch beatLatch = new CountDownLatch(1);
    long startHeartbeatTimeMs = System.currentTimeMillis();
    for (String id : memberState.getPeerMap().keySet()) {
        // 跳过自己
        if (memberState.getSelfId().equals(id)) {
            continue;
        }
        HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
        heartBeatRequest.setGroup(memberState.getGroup());
        heartBeatRequest.setLocalId(memberState.getSelfId());
        heartBeatRequest.setRemoteId(id);
        heartBeatRequest.setLeaderId(leaderId);
        heartBeatRequest.setTerm(term);
        // 发起心跳
        CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);
}

心跳请求处理

handleHeartBeat 方法主要处理来自领导者发送的心跳请求,并根据请求的内容进行相应的状态更新和角色调整。整个过程涉及以下几个关键点:

1.节点合法性检查
首先判断发起心跳请求的 leaderId 是否是集群中已知的合法节点。如果不是,返回 UNKNOWN_MEMBER 错误码,表示该节点并不属于当前集群。

2.自检防止逻辑错误
如果心跳请求的 leaderId 与当前节点的 selfId 相同,日志中会记录一个异常警告,并返回 UNEXPECTED_MEMBER 错误码,防止自身节点错误地处理自己的心跳请求。

3.检查心跳任期是否过期
如果请求中的任期 term 小于当前节点的任期,表示请求已经过期,返回 EXPIRED_TERM 错误码。
如果任期相同且 leaderId 匹配当前已知的领导者,则更新心跳时间 lastLeaderHeartBeatTime 并返回成功响应。

4.异常情况下的处理(同步状态)
在锁定 memberState 的情况下,检查请求的任期和领导者 ID 以防止并发问题。

  • 如果心跳请求中的 term 小于当前节点,返回 EXPIRED_TERM
  • 如果请求任期相同且当前没有已知的领导者,节点会切换角色为追随者(Follower)并更新 leaderId,表示接受新的领导者。
  • 如果已存在领导者且 leaderId 一致,更新心跳时间,返回成功响应。
  • 若发现领导者不一致,记录异常日志并返回 INCONSISTENT_LEADER 错误码,防止不一致情况继续。

6.处理更大任期的情况
当请求中的 term 大于当前节点的任期时,当前节点不会立即变成追随者,而是先切换为候选者(Candidate)并通知系统维护线程进行处理,返回 TERM_NOT_READY 错误码。

public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {
        // 检查请求节点是否是当前集群的成员
    if (!memberState.isPeerMember(request.getLeaderId())) {
        return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNKNOWN_MEMBER.getCode()));
    }
        // 检查请求节点是否是当前节点
    if (memberState.getSelfId().equals(request.getLeaderId())) {
        return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNEXPECTED_MEMBER.getCode()));
    }
        // 请求节点的任期比当前节点的任期小
    if (request.getTerm() < memberState.currTerm()) {
        return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
    // 正常心跳
    } else if (request.getTerm() == memberState.currTerm()) {
        if (request.getLeaderId().equals(memberState.getLeaderId())) {
            lastLeaderHeartBeatTime = System.currentTimeMillis();
            return CompletableFuture.completedFuture(new HeartBeatResponse());
        }
    }
    synchronized (memberState) {
        // 请求节点的任期比当前节点的任期小
        if (request.getTerm() < memberState.currTerm()) {
            return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
        } else if (request.getTerm() == memberState.currTerm()) {
            // 任期相同,且当前节点没有已知的领导者,则更新其角色为追随者
            if (memberState.getLeaderId() == null) {
                changeRoleToFollower(request.getTerm(), request.getLeaderId());
                return CompletableFuture.completedFuture(new HeartBeatResponse());
            // 任期相同,且领导者一致,返回成功
            } else if (request.getLeaderId().equals(memberState.getLeaderId())) {
                lastLeaderHeartBeatTime = System.currentTimeMillis();
                return CompletableFuture.completedFuture(new HeartBeatResponse());
            } else {
                // 领导者不一致
                return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode()));
            }
        } else {
            // 心跳请求的任期大于当前节点的任期,更新角色为候选者,更新立即提升任期标识
            changeRoleToCandidate(request.getTerm());
            needIncreaseTermImmediately = true;
            return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));
        }
    }
}

心跳结果处理

领导者发起心跳后会等待投票结果返回,并根据响应做出不同处理,具体包括:

  1. 根据心跳响应更新状态:根据心跳响应的不同结果(成功、任期过期、领导者不一致等),对集群状态进行更新或角色转换。
  2. 等待心跳响应的结果:程序会等待一段时间以确保心跳响应全部到达,并根据结果进行进一步处理。
  3. 处理异常情况:如果其他节点的任期大于当前节点,则当前节点转换为候选者;如果心跳响应没有达到法定人数或心跳时间过长,则根据情况转换角色。
future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
        try {
            if (ex != null) {
                memberState.getPeersLiveTable().put(id, Boolean.FALSE);
                throw ex;
            }
            switch (DLedgerResponseCode.valueOf(x.getCode())) {
                // 心跳成功,增加成功计数
                case SUCCESS:
                    succNum.incrementAndGet();
                    break;
                // 响应中的任期小于当前节点的任期,记录最大的任期
                case EXPIRED_TERM:
                    maxTerm.set(x.getTerm());
                    break;
                // 领导者不一致,将 inconsistLeader 标志设置为 true
                case INCONSISTENT_LEADER:
                    inconsistLeader.compareAndSet(false, true);
                    break;
                // 任期尚未准备好,增加未准备好的节点计数
                case TERM_NOT_READY:
                    notReadyNum.incrementAndGet();
                    break;
                default:
                    break;
            }
                        // 网络异常处理
            if (x.getCode() == DLedgerResponseCode.NETWORK_ERROR.getCode())
                memberState.getPeersLiveTable().put(id, Boolean.FALSE);
            else
                memberState.getPeersLiveTable().put(id, Boolean.TRUE);
                        // 心跳成功
            if (memberState.isQuorum(succNum.get())
                    || memberState.isQuorum(succNum.get() + notReadyNum.get())) {
                beatLatch.countDown();
            }
        } catch (Throwable t) {
            LOGGER.error("heartbeat response failed", t);
        } finally {
            allNum.incrementAndGet();
            if (allNum.get() == memberState.peerSize()) {
                beatLatch.countDown();
            }
        }
    });
}
long voteResultWaitTime = 10;
beatLatch.await(heartBeatTimeIntervalMs - voteResultWaitTime, TimeUnit.MILLISECONDS);
Thread.sleep(voteResultWaitTime);

// 群中有节点的任期大于当前节点的任期,当前节点会更新为候选者,准备参加下一轮选举
if (maxTerm.get() > term) {
    changeRoleToCandidate(maxTerm.get());
    return;
}
// 心跳成功
if (memberState.isQuorum(succNum.get())) {
    lastSuccHeartBeatTime = System.currentTimeMillis();
} else {
    // 设置lastSendHeartBeatTime为-1,加快心跳触发
    if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {
        lastSendHeartBeatTime = -1;
    // 领导者不一致
    } else if (inconsistLeader.get()) {
        changeRoleToCandidate(term);
    // 最近一次成功的心跳时间超过maxHeartBeatLeak,更新节点角色为候选者
    } else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) {
        changeRoleToCandidate(term);
    }
}

总结

DLedger 的选举流程包括两个关键部分:选主和心跳机制。

  1. 选主:当集群中没有有效的领导者时,选举流程会启动。每个节点首先通过提升任期来发起选举,成为候选者并向其他节点发送投票请求。集群中的节点根据收到的投票请求,比较候选者的任期并选择任期最高且最新日志最完整的节点作为领导者。一旦某节点获得超过半数的投票,便会被选为新的领导者,开始接管集群的日志写入和同步任务。
  2. 心跳机制:领导者选出后,会定期向跟随者节点发送心跳包。心跳包不仅用于告知跟随者领导者的存活状态,还携带当前任期等信息。如果跟随者在一定时间内未收到心跳包,会认为领导者失效并启动新一轮选举。心跳机制确保领导者持续维持其角色,并保持集群的稳定运行。

通过选主和心跳机制,DLedger 能够在节点失效或网络分区等异常情况下快速选出新的领导者,确保系统的高可用性和一致性。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容