上偏文章我们分析了consul 一致性default 和stale 两种模式的一致性读的实现原理,让我们回忆下,
Stale模式
链接任何一个server节点都可以读,容忍过期的数据,
Default 模式
这个是我们大多数人用的模式,需要从leader返回数据,如果agent链接到server是follower节点,则需要转发给leader来处理。
Consistent 模式
而且我们也说了Consistent 一致性读比default模式还要严格,除了需要leader返回数据外,还需要确认当前leader是否唯一,怎么确认,就需要和follower 通信来确认,就是问下几个从节点,嘿,我还是leader吗,如果超过半数的人都响应是,则恭喜你,还是leader,就可以返回数据了。否则返回失败。
下面我们分析下原理是怎么实现的,初步摸排了下,这个过程还挺复杂的,所以需要单独再开一篇文章来说明。
检查入口
consul server 出来所有的请求基本上都支持阻塞查询,即blockingQuery方法,consul 在计算好超时时间后,会做是否为Consistent模式的读。
// Validate
// If the read must be consistent we verify that we are still the leader.
// queryOpts 这里检查是否一致性读
if queryOpts.GetRequireConsistent() {
//如果是则通过consistentRead来实现
if err := s.consistentRead(); err != nil {
return err
}
}
//不是本地之间返回
我们来分析consistentRead的逻辑, 关键就时发起一个leader验证的请求,然后等待结果,代码如下:
// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
defer metrics.MeasureSince([]string{"rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader()
//查询请求会阻塞在这里,即future.Error(),需要等过半验证成功才返回。
if err := future.Error(); err != nil {
return err //fail fast if leader verification fails
}
// poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds
if s.isReadyForConsistentReads() {
return nil
}
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
deadline := time.Now().Add(s.config.RPCHoldTimeout)
for time.Now().Before(deadline) {
select {
case <-time.After(jitter):
// Drop through and check before we loop again.
case <-s.shutdownCh:
return fmt.Errorf("shutdown waiting for leader")
}
if s.isReadyForConsistentReads() {
return nil
}
}
return structs.ErrNotReadyForConsistentReads
}
我们的查询就阻塞在future.Error() 这里,没有验证完,这里是不会响应的。
那关键就在s.raft.VerifyLeader()这里,我们继续往下挖,下面是实现:
// VerifyLeader is used to ensure the current node is still
// the leader. This can be done to prevent stale reads when a
// new leader has potentially been elected.
func (r *Raft) VerifyLeader() Future {
metrics.IncrCounter([]string{"raft", "verify_leader"}, 1)
verifyFuture := &verifyFuture{}
//init 会初始化errCh,页就是前面error会阻塞在这个channel上。
verifyFuture.init()
select {
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.verifyCh <- verifyFuture:
//写一个future到verifyCh,consul leader主协程会watch这个verifyCh,会触发验证的逻辑
return verifyFuture
}
}
老外写的代码还是很友好的,注释写的很清楚,看代码的同时还能看写的注释,页大概明白这个方法的意图,这个VerifyLeader 的核心逻辑就是创建一个verifyFuture,这个future很关键,关键指标都在这里,定义如下:
/ verifyFuture is used to verify the current node is still
// the leader. This is to prevent a stale read.
type verifyFuture struct {
deferError
notifyCh chan *verifyFuture
quorumSize int //过半大小,默认是0
votes int //验证leader时follower响应ok,时votes会+1,如果超过quorumSize,则认为还是leader。
voteLock sync.Mutex
}
votes 是每个follower返回验证成功时,会对votes+1,然后判断是否大于过半quorumSize,默认是0,如果是过半,那恭喜你,目前你还是leader,可以执行当前这次读请求。
下面我们要看consul是怎么给follower发送验证请求的,通过上面的代码,可以看出,是向leader的verifyCh 写了一个future,所以肯定有一个go routing 会阻塞在这个verifyCh channel上,这就是异步通知, 下面就是leader的主协程登场了。
Leader 主协程
leader 完成初始化后,最后会启动一个循环函数,先看下定义,同样注释也说明的很清楚。
// leaderLoop is the hot loop for a leader. It is invoked
// after all the various leader setup is done.
func (r *Raft) leaderLoop() {
...
这个leaderLoop会监听很多channel,比如rpc请求,commit等,其中一个就有verifyCh的channel,代码如下:
case v := <-r.verifyCh:
if v.quorumSize == 0 {
// Just dispatched, start the verification
r.verifyLeader(v)
} else if v.votes < v.quorumSize {
// Early return, means there must be a new leader
r.logger.Warn("new leader elected, stepping down")
r.setState(Follower)
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(ErrNotLeader)
} else {
// Quorum of members agree, we are still leader
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(nil)
}
我们前面查询的goroutine 通过发了一个verifyFuture 给verifyCh,leader 的main goroutine 就监听在这里,我们前面也说了,quorumSize 默认是0,所以第一次是会触发verifyLeader的逻辑,什么时候触发另外两个的逻辑呢,要等验证超过一半的请求返还了,就会再给verifyCh发一个消息,这时候,正常情况下就是找v.respond的逻辑,最终通知最前面的query go routine 阻塞就会被唤醒。
verifyLeader的核心逻辑就是初始化了quorumSize,另外就是对follower循环,consul 有个 replicate go routine 会和follower发心跳信息,每个follower一个,除了定时发心跳外,还支持实时触发心态,也就是监听notifyCh 这个channel,这个leader的go routine会发一个空的struct给这个channel来触发,会给每个follower都发一个类型为rpcAppendEntries的消息,核心代码如下:
// Trigger immediate heartbeats
for _, repl := range r.leaderState.replState {
repl.notifyLock.Lock()
repl.notify[v] = struct{}{}
repl.notifyLock.Unlock()
//通知主动发一个心调到follower server
asyncNotifyCh(repl.notifyCh)
}
consul leader 对每个follower 维持一个heartbeat ,核心代码如下:
for {
// Wait for the next heartbeat interval or forced notify
select {
case <-s.notifyCh://通知即刻执行
case <-randomTimeout(r.conf.HeartbeatTimeout / 10)://定时执行
case <-stopCh:
return
}
start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
case <-stopCh:
}
} else {
//更新时间
s.setLastContact()
failures = 0
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
s.notifyAll(resp.Success)
}
}
绕了这么大半圈,真正验证的关键代码总算出来了,也就是 r.trans.AppendEntries执行的,这里就不用分析了,就是发一个rpc请求给follower。如果成功。通过s.notifyAll(resp.Success) 来通知前面的future,核心代码如下:
// vote is used to respond to a verifyFuture.
// This may block when responding on the notifyCh.
func (v *verifyFuture) vote(leader bool) {
v.voteLock.Lock()
defer v.voteLock.Unlock()
// Guard against having notified already
if v.notifyCh == nil {
return
}
if leader {
v.votes++
//防止一个follower响应就通知了leader,比如5台的时候,一台响应了+自己也就是2
if v.votes >= v.quorumSize {
v.notifyCh <- v
v.notifyCh = nil
}
} else {
v.notifyCh <- v
v.notifyCh = nil
}
}
如果follower响应成功,也就是认为你还是leader,则对votes加1,v.notifyCh 这里其实就是我们前面leader 的verifych, 通过幻想我们前面的查询请求阻塞在future.error哪里,整个经过这么一个复杂的流程才能完成一次正常的读请求,如果请求follower超时,则会等待一定的时间。继续请求。
总结
consul 官方文档说Consistent 模式读,为了实现这个强一致性读,consul 在背后做了这么多的事情,详细你看了这篇文章,以及前面的一篇文章,对consul的三种模式的读,应该有了一个全面的了解,在用的时候也能根据你的业务场景做出正确的选择,对consul 感兴趣的同学可以点关注,后面再继续分享consul的文章,带你看明白consul的世界。