ClientV3发送Put请求时,其携带的key,value数据被封装成一个Op
,然后转化为一个pb.PutRequest
以rpc形式被EtcdServer接收并处理。EtcdServer将序列化后的数据交由raft状态机进行处理。
// file: clientv3/kv.go
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV,
IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
序列化后的data被封装成MsgProp
类型的pb.Message
,然后将其写入Node的propc通道。
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
raft node接收到propc通道中的数据后,将其应用到状态机中。
// file: raft/node.go
select {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
然后根据角色类型调用stepLeader
, stepCandidate
或stepFollower
方法。以Leader为例
// file: raft/raft.go
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
raft首先将entries append到raftLog
,更新每条entry的Term
和Index
,然后再将其广播到其他的peers。
// file: raft/raft.go
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
// Track the size of this uncommitted proposal.
if !r.increaseUncommittedSize(es) {
r.logger.Debugf(
"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
r.id,
)
// Drop the proposal.
return false
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
}
Leader发送Append Entries Message到peers。
- 首先判断progress有没有被暂停,如果被暂停则直接返回。
- 从raftLog中获取Term以及要发送的entries。如果获取失败一般是由于Log被compact导致的
ErrCompacted
,或者是ErrUnavailable
。 - 如果获取term或者entries失败则发送快照消息
MsgSnap
。同时将Progress状态重置为StateSnapshot
,PendingSnapshot
置为snapshot的index。m := pb.Message{} m.To = to m.Type = pb.MsgSnap m.Snapshot = snapshot // 从raftLog中获取snapshot
- 如果成功获取term和entries,则将消息类型置为
MsgApp
。m := pb.Message{} m.To = to m.Type = pb.MsgApp m.Index = pr.Next - 1 // TODO(zhengliang): pr.Next or pr.Next - 1 ? m.LogTerm = term m.Entries = ents m.Commit = r.raftLog.committed
- 如果要发送的entries大小不为0,则根据Progress的类型进行相应的处理。
- 如果类型为
tracker.StateReplicate
- 首先获取entries的last index。
- 乐观地更新Progress的
Next
index为last index +1
。 - 将last index添加到Progress的inflights。
Inflights
是inflight messages的sliding window,Inflights
能有效地限制inflight messages的数目,以及每个Progress能处理的带宽。当inflights满了时,消息将不能被发送。例如,当leader发送消息时,会将entries的last index加入到inflights中,同时index在Inflights
中是按次序排列的。当leader接收到reply时,通过调用inflights.FreeLE来释放先前的inflights。
- 如果类型为
tracker.StateProbe
,则只是简单将Progress的ProbeSent
置为true。当ProbeSent为true时,则停止发送replication message到这个peer,知道ProbeSent被重置。
// file: raft/raft.go
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
r.sendAppend(id)
})
}
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}
// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.prs.Progress[to]
if pr.IsPaused() {
return false
}
m := pb.Message{}
m.To = to
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {
return false
}
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1 // TODO(zhengliang): pr.Next or pr.Next - 1 ?
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in StateReplicate
case tracker.StateReplicate:
last := m.Entries[n-1].Index
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
case tracker.StateProbe:
pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m)
return true
}
Message封装好之后,调用raft的send()方法发送出去。而raft的send()方法只是简单的做了一下检查,为Message加上Term。最后将其添加到raft的msgs
队列。
r.msgs = append(r.msgs, m)
当RawNode
检测到有新的Ready处于pending状态,则populate成一个Ready
对象。Ready
是只读的,是对将要保存到stable storage,committed或者发送到peers的entries和messages的封装。然后将其写入readyc
通道。
// HasReady called when RawNode user need to check if any Ready pending.
// Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) HasReady() bool {
r := rn.raft
if !r.softState().equal(rn.prevSoftSt) {
return true
}
if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
return true
}
if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
return true
}
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
return true
}
if len(r.readStates) != 0 {
return true
}
return false
}
// file: raft/node.go
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
if advancec != nil {
readyc = nil
} else if n.rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
.........
case readyc <- rd: // 将其写入readyc通道
n.rn.acceptReady(rd)
advancec = n.advancec
etcdServer的raftNode从通道中接收到Ready
数据后,将其中的Committed entries和snapshot封装成 apply
后写入applyc
通道,在写入前会更新committed index
。接着执行如下步骤:
- 如果为Leader node,则会将messages发送到peers。而其中的
raftpb.MsgSnap
类型的message会写入msgSnapC
通道。leader可以并发写磁盘以及复制到peers节点。 - 持久化snapshot。
- 持久化Hard State和entries。
- 强制WAL去
fsync
它的hard state。 - 等待
applyAll
执行完成。
// file: etcdserver/raft.go
// apply contains entries, snapshot to be applied. Once
// an apply is consumed, the entries will be persisted to
// to raft storage concurrently; the application must read
// raftDone before assuming the raft messages are stable.
type apply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
}
// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
case <-r.ticker.C:
r.tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
if newLeader {
leaderChanges.Inc()
}
if rd.SoftState.Lead == raft.None {
hasLeader.Set(0)
} else {
hasLeader.Set(1)
}
rh.updateLead(rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
if islead {
isLeader.Set(1)
} else {
isLeader.Set(0)
}
rh.updateLeadership(newLeader)
r.td.Reset()
}
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
case <-r.stopped:
return
}
}
notifyc := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
}
updateCommittedIndex(&ap, rh)
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
// the leader can write to its disk in parallel with replicating to the followers and them
// writing to their disks.
// For more details, check raft thesis 10.2.1
if islead {
// gofail: var raftBeforeLeaderSend struct{}
r.transport.Send(r.processMessages(rd.Messages))
}
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
}
// gofail: var raftAfterSaveSnap struct{}
}
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSave struct{}
if !raft.IsEmptySnap(rd.Snapshot) {
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
if err := r.storage.Sync(); err != nil {
r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
}
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
// gofail: var raftBeforeApplySnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
// gofail: var raftAfterApplySnap struct{}
if err := r.storage.Release(rd.Snapshot); err != nil {
r.lg.Fatal("failed to release Raft wal", zap.Error(err))
}
// gofail: var raftAfterWALRelease struct{}
}
r.raftStorage.Append(rd.Entries)
if !islead {
// finish processing incoming messages before we signal raftdone chan
msgs := r.processMessages(rd.Messages)
// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
notifyc <- struct{}{}
// Candidate or follower needs to wait for all pending configuration
// changes to be applied before sending messages.
// Otherwise we might incorrectly count votes (e.g. votes from removed members).
// Also slow machine's follower raft-layer could proceed to become the leader
// on its own single-node cluster, before apply-layer applies the config change.
// We simply wait for ALL pending entries to be applied for now.
// We might improve this later on if it causes unnecessary long blocking issues.
waitApply := false
for _, ent := range rd.CommittedEntries {
if ent.Type == raftpb.EntryConfChange {
waitApply = true
break
}
}
if waitApply {
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
// (assume notifyc has cap of 1)
select {
case notifyc <- struct{}{}:
case <-r.stopped:
return
}
}
// gofail: var raftBeforeFollowerSend struct{}
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
notifyc <- struct{}{}
}
r.Advance()
case <-r.stopped:
return
}
}
}()
}
etcdServer从applyc中接收到apply数据后会将其apply。
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
s.applySnapshot(ep, apply)
s.applyEntries(ep, apply)
proposalsApplied.Set(float64(ep.appliedi))
s.applyWait.Trigger(ep.appliedi)
// wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.
<-apply.notifyc
s.triggerSnapshot(ep)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
}