etcd学习笔记4(草稿)

ClientV3发送Put请求时,其携带的key,value数据被封装成一个Op,然后转化为一个pb.PutRequest以rpc形式被EtcdServer接收并处理。EtcdServer将序列化后的数据交由raft状态机进行处理。

// file: clientv3/kv.go
case tPut:
        var resp *pb.PutResponse
        r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, 
                    IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
        resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
        if err == nil {
            return OpResponse{put: (*PutResponse)(resp)}, nil
        }

序列化后的data被封装成MsgProp类型的pb.Message,然后将其写入Node的propc通道。

func (n *node) Propose(ctx context.Context, data []byte) error {
     return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

raft node接收到propc通道中的数据后,将其应用到状态机中。

// file: raft/node.go
select {
        // TODO: maybe buffer the config propose if there exists one (the way
        // described in raft dissertation)
        // Currently it is dropped in Step silently.
        case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }

然后根据角色类型调用stepLeader, stepCandidatestepFollower方法。以Leader为例

// file: raft/raft.go
 if !r.appendEntry(m.Entries...) {
      return ErrProposalDropped
}
r.bcastAppend()
return nil

raft首先将entries append到raftLog,更新每条entry的TermIndex,然后再将其广播到其他的peers。

// file: raft/raft.go
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
    li := r.raftLog.lastIndex()
    for i := range es {
        es[i].Term = r.Term
        es[i].Index = li + 1 + uint64(i)
    }
    // Track the size of this uncommitted proposal.
    if !r.increaseUncommittedSize(es) {
        r.logger.Debugf(
            "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
            r.id,
        )
        // Drop the proposal.
        return false
    }
    // use latest "last" index after truncate/append
    li = r.raftLog.append(es...)
    r.prs.Progress[r.id].MaybeUpdate(li)
    // Regardless of maybeCommit's return, our caller will call bcastAppend.
    r.maybeCommit()
    return true
}

Leader发送Append Entries Message到peers。

  1. 首先判断progress有没有被暂停,如果被暂停则直接返回。
  2. 从raftLog中获取Term以及要发送的entries。如果获取失败一般是由于Log被compact导致的ErrCompacted,或者是ErrUnavailable
  3. 如果获取term或者entries失败则发送快照消息MsgSnap。同时将Progress状态重置为StateSnapshot, PendingSnapshot置为snapshot的index。
    m := pb.Message{}
    m.To = to
    m.Type = pb.MsgSnap
    m.Snapshot = snapshot  // 从raftLog中获取snapshot
    
  4. 如果成功获取term和entries,则将消息类型置为MsgApp
    m := pb.Message{}
    m.To = to
    m.Type = pb.MsgApp
    m.Index = pr.Next - 1  // TODO(zhengliang): pr.Next or pr.Next - 1 ?
    m.LogTerm = term
    m.Entries = ents
    m.Commit = r.raftLog.committed
    
  5. 如果要发送的entries大小不为0,则根据Progress的类型进行相应的处理。
  • 如果类型为tracker.StateReplicate
    • 首先获取entries的last index。
    • 乐观地更新Progress的Next index为last index +1
    • 将last index添加到Progress的inflights。Inflights是inflight messages的sliding window,Inflights能有效地限制inflight messages的数目,以及每个Progress能处理的带宽。当inflights满了时,消息将不能被发送。例如,当leader发送消息时,会将entries的last index加入到inflights中,同时index在Inflights中是按次序排列的。当leader接收到reply时,通过调用inflights.FreeLE来释放先前的inflights。
      Inflights
  • 如果类型为tracker.StateProbe,则只是简单将Progress的ProbeSent置为true。当ProbeSent为true时,则停止发送replication message到这个peer,知道ProbeSent被重置。
// file: raft/raft.go
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
    r.prs.Visit(func(id uint64, _ *tracker.Progress) {
        if id == r.id {
            return
        }
        r.sendAppend(id)
    })
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
    r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
    pr := r.prs.Progress[to]
    if pr.IsPaused() {
        return false
    }
    m := pb.Message{}
    m.To = to

    term, errt := r.raftLog.term(pr.Next - 1)
    ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
    if len(ents) == 0 && !sendIfEmpty {
        return false
    }

    if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
        if !pr.RecentActive {
            r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
            return false
        }

        m.Type = pb.MsgSnap
        snapshot, err := r.raftLog.snapshot()
        if err != nil {
            if err == ErrSnapshotTemporarilyUnavailable {
                r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                return false
            }
            panic(err) // TODO(bdarnell)
        }
        if IsEmptySnap(snapshot) {
            panic("need non-empty snapshot")
        }
        m.Snapshot = snapshot
        sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
        r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
            r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
        pr.BecomeSnapshot(sindex)
        r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
    } else {
        m.Type = pb.MsgApp
        m.Index = pr.Next - 1 // TODO(zhengliang): pr.Next or pr.Next - 1 ?
        m.LogTerm = term
        m.Entries = ents
        m.Commit = r.raftLog.committed
        if n := len(m.Entries); n != 0 {
            switch pr.State {
            // optimistically increase the next when in StateReplicate
            case tracker.StateReplicate:
                last := m.Entries[n-1].Index
                pr.OptimisticUpdate(last)
                pr.Inflights.Add(last)
            case tracker.StateProbe:
                pr.ProbeSent = true
            default:
                r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
            }
        }
    }
    r.send(m)
    return true
}

Message封装好之后,调用raft的send()方法发送出去。而raft的send()方法只是简单的做了一下检查,为Message加上Term。最后将其添加到raft的msgs队列。

r.msgs = append(r.msgs, m)

RawNode检测到有新的Ready处于pending状态,则populate成一个Ready对象。Ready是只读的,是对将要保存到stable storage,committed或者发送到peers的entries和messages的封装。然后将其写入readyc通道。

// HasReady called when RawNode user need to check if any Ready pending.
// Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) HasReady() bool {
    r := rn.raft
    if !r.softState().equal(rn.prevSoftSt) {
        return true
    }
    if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
        return true
    }
    if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
        return true
    }
    if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
        return true
    }
    if len(r.readStates) != 0 {
        return true
    }
    return false
}
// file: raft/node.go
func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready

    r := n.rn.raft

    lead := None

    for {
        if advancec != nil {
            readyc = nil
        } else if n.rn.HasReady() {
            // Populate a Ready. Note that this Ready is not guaranteed to
            // actually be handled. We will arm readyc, but there's no guarantee
            // that we will actually send on it. It's possible that we will
            // service another channel instead, loop around, and then populate
            // the Ready again. We could instead force the previous Ready to be
            // handled first, but it's generally good to emit larger Readys plus
            // it simplifies testing (by emitting less frequently and more
            // predictably).
            rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }
        .........
        case readyc <- rd:  // 将其写入readyc通道
            n.rn.acceptReady(rd)
            advancec = n.advancec

etcdServer的raftNode从通道中接收到Ready数据后,将其中的Committed entries和snapshot封装成 apply后写入applyc通道,在写入前会更新committed index。接着执行如下步骤:

  1. 如果为Leader node,则会将messages发送到peers。而其中的raftpb.MsgSnap类型的message会写入msgSnapC通道。leader可以并发写磁盘以及复制到peers节点。
  2. 持久化snapshot。
  3. 持久化Hard State和entries。
  4. 强制WAL去fsync它的hard state。
  5. 等待applyAll执行完成。
// file: etcdserver/raft.go
// apply contains entries, snapshot to be applied. Once
// an apply is consumed, the entries will be persisted to
// to raft storage concurrently; the application must read
// raftDone before assuming the raft messages are stable.
type apply struct {
    entries  []raftpb.Entry
    snapshot raftpb.Snapshot
    // notifyc synchronizes etcd server applies with the raft node
    notifyc chan struct{}
}

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
    internalTimeout := time.Second

    go func() {
        defer r.onStop()
        islead := false

        for {
            select {
            case <-r.ticker.C:
                r.tick()
            case rd := <-r.Ready():
                if rd.SoftState != nil {
                    newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
                    if newLeader {
                        leaderChanges.Inc()
                    }

                    if rd.SoftState.Lead == raft.None {
                        hasLeader.Set(0)
                    } else {
                        hasLeader.Set(1)
                    }

                    rh.updateLead(rd.SoftState.Lead)
                    islead = rd.RaftState == raft.StateLeader
                    if islead {
                        isLeader.Set(1)
                    } else {
                        isLeader.Set(0)
                    }
                    rh.updateLeadership(newLeader)
                    r.td.Reset()
                }

                if len(rd.ReadStates) != 0 {
                    select {
                    case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
                    case <-time.After(internalTimeout):
                        r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
                    case <-r.stopped:
                        return
                    }
                }
                notifyc := make(chan struct{}, 1)
                ap := apply{
                    entries:  rd.CommittedEntries,
                    snapshot: rd.Snapshot,
                    notifyc:  notifyc,
                }

                updateCommittedIndex(&ap, rh)

                select {
                case r.applyc <- ap:
                case <-r.stopped:
                    return
                }

                // the leader can write to its disk in parallel with replicating to the followers and them
                // writing to their disks.
                // For more details, check raft thesis 10.2.1
                if islead {
                    // gofail: var raftBeforeLeaderSend struct{}
                    r.transport.Send(r.processMessages(rd.Messages))
                }

                // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
                // ensure that recovery after a snapshot restore is possible.
                if !raft.IsEmptySnap(rd.Snapshot) {
                    // gofail: var raftBeforeSaveSnap struct{}
                    if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
                    }
                    // gofail: var raftAfterSaveSnap struct{}
                }

                // gofail: var raftBeforeSave struct{}
                if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                    r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
                }
                if !raft.IsEmptyHardState(rd.HardState) {
                    proposalsCommitted.Set(float64(rd.HardState.Commit))
                }
                // gofail: var raftAfterSave struct{}

                if !raft.IsEmptySnap(rd.Snapshot) {
                    // Force WAL to fsync its hard state before Release() releases
                    // old data from the WAL. Otherwise could get an error like:
                    // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
                    // See https://github.com/etcd-io/etcd/issues/10219 for more details.
                    if err := r.storage.Sync(); err != nil {
                        r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
                    }
                    // etcdserver now claim the snapshot has been persisted onto the disk
                    notifyc <- struct{}{}

                    // gofail: var raftBeforeApplySnap struct{}
                    r.raftStorage.ApplySnapshot(rd.Snapshot)
                    r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
                    // gofail: var raftAfterApplySnap struct{}

                    if err := r.storage.Release(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to release Raft wal", zap.Error(err))
                    }
                    // gofail: var raftAfterWALRelease struct{}
                }

                r.raftStorage.Append(rd.Entries)

                if !islead {
                    // finish processing incoming messages before we signal raftdone chan
                    msgs := r.processMessages(rd.Messages)

                    // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
                    notifyc <- struct{}{}

                    // Candidate or follower needs to wait for all pending configuration
                    // changes to be applied before sending messages.
                    // Otherwise we might incorrectly count votes (e.g. votes from removed members).
                    // Also slow machine's follower raft-layer could proceed to become the leader
                    // on its own single-node cluster, before apply-layer applies the config change.
                    // We simply wait for ALL pending entries to be applied for now.
                    // We might improve this later on if it causes unnecessary long blocking issues.
                    waitApply := false
                    for _, ent := range rd.CommittedEntries {
                        if ent.Type == raftpb.EntryConfChange {
                            waitApply = true
                            break
                        }
                    }
                    if waitApply {
                        // blocks until 'applyAll' calls 'applyWait.Trigger'
                        // to be in sync with scheduled config-change job
                        // (assume notifyc has cap of 1)
                        select {
                        case notifyc <- struct{}{}:
                        case <-r.stopped:
                            return
                        }
                    }

                    // gofail: var raftBeforeFollowerSend struct{}
                    r.transport.Send(msgs)
                } else {
                    // leader already processed 'MsgSnap' and signaled
                    notifyc <- struct{}{}
                }

                r.Advance()
            case <-r.stopped:
                return
            }
        }
    }()
}

etcdServer从applyc中接收到apply数据后会将其apply。

func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
    s.applySnapshot(ep, apply)
    s.applyEntries(ep, apply)

    proposalsApplied.Set(float64(ep.appliedi))
    s.applyWait.Trigger(ep.appliedi)

    // wait for the raft routine to finish the disk writes before triggering a
    // snapshot. or applied index might be greater than the last index in raft
    // storage, since the raft routine might be slower than apply routine.
    <-apply.notifyc

    s.triggerSnapshot(ep)
    select {
    // snapshot requested via send()
    case m := <-s.r.msgSnapC:
        merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
        s.sendMergedSnap(merged)
    default:
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352