etcd 代码(3.6.0-pre)分析可以参考:etcd 注解版 ,也欢迎大家交流讨论
Raft message type
Raft集群中节点之间的通信都是通过传递不同的Message来完成的,Message类型 有很多种,下面介绍部分:
- MsgHup
用于选举,对于
follower
或者candidate
,其tick
函数对应tickElection
。如果follower
或者candidate
在选举超时时间都没有收到心跳信息,它将发送MsgHup到Step方法,其角色将变为candidate(对于follower节点)并发起新一轮选举。
- MsgBeat
MsgApp是一个内部使用的类型,对于leader节点,其
tick
函数对应tickHeartbeat
,周期性地向follower节点发送MsgHeartbeat
消息。
- MsgProp
MsgProp提议附加数据到log entries,而且它会重定向propose到leader节点。因此,
send
方法用hardState覆写了Message的term号。-
- 当MsgProp传递到leader的
step
方法,leader首先会调用appendEntry
方法将entries附加到它的log,接着会调用bcastAppend
方法发送entries到peers。- 当MsgProp传递到candidate,则直接丢弃。
- 当传递到follower,MsgProp存储到follower的mailbox,即msgs,它保存了发送者的ID,随后通过
rafthttp
包转发到leader。
- MsgApp
复制log entries。leader调用
bcastAppend
方法,其会调用bcastAppend
方法发送即将要复制的MsgApp类型的logs。当MsgApp被传递到candidate的Step
方法,candidate将会revert back to follower,表示存在有效的leader在发送MsgApp消息。candidate和follower会回复MsgAppResp
消息。
- MsgAppResp
用于响应log复制请求。当MsgApp发送到candidate和follower的
Step
方法,它们会调用handleAppendEntries
方法,其会发送MsgAppResp到raft mailbox。
- MsgVote
请求投票选举。当MsgHup传递到follower或candidate的
Step
方法,将会调用campaign
方法去竞选自己成为leader, 一旦campaign
方法被调用,节点将会变成candidate,然后发送MsgVote到peers去请求投票。
- 当消息传递到leader或candidate的
Step
方法,如果消息的term小于leader或candidate的term,则消息会被拒绝掉(MsgVoteResp将设置Reject
为true),如果leader或candidate接收到term更大的MsgVote,他们将会revert back to follower。- 当'MsgVote'被传递给follower时,只有当sender的last term大于MsgVote的term或sender的last term等于MsgVote的term但sender的最后提交索引大于或等于follower的时,它才会投票给sender。
- MsgVoteResp
投票选举相应消息。当candidate收到MsgVoteResp,它会统计自己的票数,如果票数超过quorum,它将会变成leader,然后调用
bcastAppend
,如果candidate收到大多数的拒绝投票,则revert back to follower。
- MsgPreVote和MsgPreVoteResp
两阶段选举协议,是一个可选配置项。当
Config.PreVote
设置为true时,预选举过程与常规选举过程类似,只是不会增加term,除非它在第一个阶段竞选赢得了大多数票。加入这个选项可以将节点分区导致的影响降至最低。
- MsgSnap
请求应用快照消息。当一个节点刚变为leader,或者leader接收到MsgProp消息,然后调用
bcastAppend
方法,这个方法会调用sendAppend
方法到每个follower节点。在sendAppend
方法中,如果leader获取term或者entries失败,则leader将会发送MsgSnap类型消息来请求快照。
- MsgSnapStatus
告知快照消息应用的结果。当follower节点拒绝MsgSnap消息,表明因为网络问题导致网络层不能正常发送snapshot到follower,从而导致快照请求失败,leader将follower的
progress
设置为probe
。当MsgSnap没有被拒绝,表明快照被正常接收,leader将follower的progress
设置为probe
,同时开始log复制。
- MsgHeartbeat
leader发送心跳信息。
- 当MsgHeartbeat消息发送到candidate,且消息的term大于candidate的term,则candidate将revert back to follower,同时会更新自己的提交索引号,然后发送消息到mailbox。
- 当MsgHeartbeat消息发送到follower的
Step
方法,如果消息的term大于follower的term,则follower会更新leaderID
。
- MsgHeartbeatResp
心跳响应消息。MsgHeartbeatResp传递到leader的
Step
方法,leader会知道哪些节点做了回复响应。只有当leader最后提交索引号大于follower的Match
索引号时leader调用sendAppend
方法。
- MsgUnreachable
告知消息或请求不能被deliver,当MsgUnreachable传递到leader的
Step
方法,leader发现发送MsgUnreachable消息的follower节点不可达,这通常意味着MsgApp丢失了。如果此时follower的progress状态为replicate
,则leader会将其重新设置回probe
状态。
- MsgCheckQuorum
如果开启
CheckQuorum
,则选举超时后会发送MsgCheckQuorum
消息,leader节点判断其是否满足quorum,如果不满足则step down to follower节点。
- MsgReadIndex
- MsgReadIndexResp
- MsgTransferLeader 和 MsgTimeoutNow
对于
MsgTransferLeader
消息,follower节点会将其转发到leader节点,leader节点在收到MsgTransferLeader
消息后会首先记录lead被转移者,然后判断转移目标的日志是否跟上了。
- 如果跟上了会向被转移者发送
MsgTimeoutNow
消息, 被转移者收到消息后会强制发起新一轮选举。- 如果没有跟上则先进行日志同步,等leader收到同步日志的
MsgAppResp
消息后会判断其是否已跟上,步骤同上。
Local message
- MsgHup
- MsgBeat
- MsgUnreachable
- MsgSnapStatus
- MsgCheckQuorum
Response message
- MsgAppResp
- MsgVoteResp
- MsgHeartbeatResp
- MsgUnreachable
- MsgPreVoteResp
Q & A
- 发送如下类型的消息时需要设置term,其他类型的消息都不需要设置term。
- pb.MsgVote
- pb.MsgVoteResp
- pb.MsgPreVote
- pb.MsgPreVoteResp
但是对于消息类型既不是MsgProp,又不是MsgReadIndex类型的,会为其加上raft.Term,具体实现参考raft/raft.go
中的 send()
方法。
- raft的
tracker.ProgressTracker
在什么地方赋值的?
初始化集群时,如果没有
WAL
,同时为新集群,同时指定了--initial-cluster
参数,会将其解析为ServerConfig的InitialPeerURLsMap
参数,然后初始化RaftCluster,并添加members。接着bootstrapRaftFromCluster
方法会根据cluster的member ids生成peers。
Peer的信息如下:
type Peer struct {
ID uint64
Context []byte
}
type Member struct {
ID types.ID `json:"id"`
RaftAttributes
Attributes
}
peer中的
Context
为Member
序列化后的信息,其中包括 ID, RaftAttributes, Attributes信息。接着raft/node.go
会调用StartNode
方法,里面的Bootstrap方法会根据peers生成相应的pb.ConfChange
entries,然后调用applyConfChange
方法,这里会更新raft.prs
,返回最新的tracker.Config
和tracker.ProgressMap
信息。
(dlv) p cfg.Voters
go.etcd.io/etcd/raft/v3/quorum.JointConfig [
[
9372538179322589801: {},
10501334649042878790: {},
18249187646912138824: {},
],
nil,
]
(dlv) p prs
go.etcd.io/etcd/raft/v3/tracker.ProgressMap [
9372538179322589801: *{
Match: 0,
Next: 3,
State: StateProbe (0),
PendingSnapshot: 0,
RecentActive: true,
ProbeSent: false,
Inflights: *(*"go.etcd.io/etcd/raft/v3/tracker.Inflights")(0xc00012f470),
IsLearner: false,},
10501334649042878790: *{
Match: 0,
Next: 3,
State: StateProbe (0),
PendingSnapshot: 0,
RecentActive: true,
ProbeSent: false,
Inflights: *(*"go.etcd.io/etcd/raft/v3/tracker.Inflights")(0xc00012f620),
IsLearner: false,},
18249187646912138824: *{
Match: 0,
Next: 3,
State: StateProbe (0),
PendingSnapshot: 0,
RecentActive: true,
ProbeSent: false,
Inflights: *(*"go.etcd.io/etcd/raft/v3/tracker.Inflights")(0xc00012f560),
IsLearner: false,},
]
-
apply
的时候什么条件触发快照?unstable
中的快照是什么时候赋值的?什么条件触发?
TODO
- 为什么
raft.msgs
读写时不需要加锁?
其实etcd里很多地方都是采用的单线程模式,比如
apply
也是。
- 对于
raftpb.Message
中的MsgApp
类型,其LogTerm
,Index
,Commit
,Entries
的含义?
LogTerm
通常用于append raft logs到follower节点。例如:对于消息
(type=MsgApp,index=100,logTerm=5)
,表示leader从index=101开始 append entries,而index=100对应的Term值为5。
对于消息(type=MsgAppResp,reject=true,index=100,logTerm=5)
,表示follower拒绝了leader的entries(可能是部分),由于follower节点已经包含index=100,term=5的entry。
Commit
为raftLog
的committed
index。
Index
和LogTerm
字段是用于日志匹配的日志(即发送的日志的上一条日志)的index与term(用于日志匹配的term字段为LogTerm,消息的Term字段为该节点当前的term,部分消息需要自己指定,部分消息由send
方法填充)。Entries
字段保存了需要复制的日志条目。Commit
字段为leader提交的最后一条日志的索引。
- 如何提高读性能,同时避免网络分区后重新选举出新leader出现的
stale read
?
TODO
CheckQuorum
默认自动开启,同时开启Check Quorum会自动开启Leader Leaseraft.maybeSendAppend
在发送 Message 时,会对 Message 中的entries大小做限制,maxSizePerMsg
为1MB,因此entries在超过此大小时会如何处理?raftLog.maybeAppend
如何更新commit?
如果要发送的entries超过大小限制,则会发送多次
-
pb.MsgSnapStatus
消息以及ReportSnapshot
的作用?
TODO
- candidate节点在赢得选举之后会append一条空的日志条目,其作用是什么?
candidate在当选leader后会在当前term为自己的日志追加一条空日志条目,并广播,以提交之前term的日志,具体可参考
raft/raft.go
中的handleAppendEntries()
。
- 日志复制不匹配时的回退优化算法
对于
follower
节点,无论是处理MsgApp
消息还是处理MsgSnap
消息,返回的消息都是MsgAppResp
。在通过
Transport
发送Messages
时,会忽略Message.To == 0
的消息。由于etcd的模块化设计,raft模块和存储网络模块是分开的,因此
send
方法只是将消息放入mailbox
,而不是立刻将其发出(etcd/raft也没有通信模块), 其与外界的交互都是通过Ready
来进行处理的。因此,当follower
收到MsgApp
请求时,执行的操作实际上是(不考虑特殊情况):
- 将新日志追加到
unstable
中。- 将包含
unstable
的last index
的MsgAppResp
消息放入信箱,等待发送。
对于Ready
的处理,角色不同处理的次序也是有区别的:
- 对于
follower
节点,是先将entries
,hardState
,snapshot
保存到稳定的存储后再发送Messages
。- 对于
leader
节点,可以在发送Messages
的同时将entries
,hardState
,snapshot
持久化。
- raft中的异常处理,例如 添加成员时leader正在进行
leadTransfer
,如果收到MsgProp的消息,这时会返回ErrProposalDropped
如何处理?
TODO
-
applyEntryNormal
时有V2和V3请求,分别对应pb.Request
和pb.InternalRaftRequest
,其log对应:
167 {"level":"debug","ts":"2021-12-08T10:49:48.188+0800","caller":"etcdserver/server.go:1835","msg":"Applying entry","index":8,"term":2,"type":"EntryNormal"}
168 {"level":"debug","ts":"2021-12-08T10:49:48.189+0800","caller":"etcdserver/server.go:1885","msg":"apply entry normal","consistent-index":7,"entry-index":8,"should-applyV3":true}
169 {"level":"debug","ts":"2021-12-08T10:49:48.189+0800","caller":"etcdserver/server.go:1908","msg":"applyEntryNormal","V2request":"ID:16732981032079369986 Method:\"PUT\" Path:\"/0/members/45d559f8148de837/attributes\" Val:\"{\\\"name\\\":\\\"infra4\\\",\\\"clientURLs\\\":[\\\"http://127.0.0.1:42379\\\"]}\" "}
206 {"level":"debug","ts":"2021-12-08T10:49:48.206+0800","caller":"etcdserver/server.go:1835","msg":"Applying entry","index":12,"term":2,"type":"EntryNormal"}
207 {"level":"debug","ts":"2021-12-08T10:49:48.206+0800","caller":"etcdserver/server.go:1885","msg":"apply entry normal","consistent-index":11,"entry-index":12,"should-applyV3":true}
208 {"level":"debug","ts":"2021-12-08T10:49:48.206+0800","caller":"etcdserver/server.go:1912","msg":"applyEntryNormal","raftReq":"header:<ID:13926956989250840580 > cluster_version_set:<ver:\"3.6.0\" > "}
261 {"level":"debug","ts":"2021-12-08T10:52:35.450+0800","caller":"etcdserver/server.go:1835","msg":"Applying entry","index":14,"term":2,"type":"EntryNormal"}
262 {"level":"debug","ts":"2021-12-08T10:52:35.450+0800","caller":"etcdserver/server.go:1885","msg":"apply entry normal","consistent-index":13,"entry-index":14,"should-applyV3":true}
263 {"level":"debug","ts":"2021-12-08T10:52:35.450+0800","caller":"etcdserver/server.go:1912","msg":"applyEntryNormal","raftReq":"header:<ID:3632572666012018439 > put:<key:\"\\000\\000\\000\\000\\000\\000\\000\\000\" value:\"\\026T\\316d\\230x\\326x\" > "}
- 新加入的节点或者落后很多的节点,leader 会尝试发送快照数据给follower节点,
maybeSendAppend
方法在处理时会生成快照消息,如下所示:
snapshot, _ := r.raftLog.snapshot()
pb.Message{
To: to,
Type: pb.MsgSnap,
Snapshot: snapshot, // 看起来有点多余
}
应用逻辑层通过Ready
获取messages
之后会将快照消息单独处理(将其发送到msgSnapC
),applyAll
在收到快照消息后会调用 createMergedSnapshotMessage
生成合并的snap.Message
消息后将其发送到peer端。而 createMergedSnapshotMessage
方法会根据当前 etcd progress
的 appliedt
和appliedi
重新生成新的 Metadata
和 Data
(v2 store序列化后的数据),所以上面raft层在生成 MsgSnap
消息时的 Snapshot
是多余的,虽然不影响。
-
serializable read request
和linearizable read request
?
Linearizability is one of the strongest single-object consistency models, and implies that every operation appears to take place atomically, in some order, consistent with the real-time ordering of those operations: e.g., if operation A completes before operation B begins, then B should logically take effect after A.
etcd 中默认是
linearizable read
,如果需要客户端serializable read
,可以通过WithSerializable()
进行设置,Serializable
请求适用于低延迟。
以Range
为例,
if !r.Serializable {
err = s.linearizableReadNotify(ctx)
trace.Step("agreement among raft nodes before linearized reading")
if err != nil {
return nil, err
}
}
ReadOnlyOption
包含两种类型:
-
ReadOnlySafe
通过与quorum
个节点进行通信来保证只读请求的线性化,默认选项。 -
ReadOnlyLeaseBased
依赖领导者租约(leader lease
)来保证只读请求的线性化,它会受clock drift
的影响。
如果ReadOnlyOption
是 ReadOnlyLeaseBased
的时候必须开启CheckQuorum
。