Zookeeper-leader初始化

Zookeeper-leader初始化

在选举完成后,集群每个节点的角色状态就会确定,回到QuorumPeer#start中,每个节点会根据自身的状态完成相应的处理,如下:

  • LEADING:创建Leader,等待follower 跟随自身
  • FOLLOWING:创建follower,跟随leader并从leader同步数据
  • OBSERVING:创建OBSERVING,跟随leader节点

本节我们着重分析leader节点相关的逻辑。

Leader 相关的类

  • Leader:leader的控制逻辑类
  • LeaderZookeeperServer:leader 处理请求的类

Leader节点初始化,主要是根据节点配置初始化上面两个类,在leader 初始化完成之后会新建用于leader-follower之间通信的连接(接受来自follower的连接)。

Leader 控制逻辑

当leader初始化完成之后通过leader#lead实现leader节点的功能,方法出现异常时,将重置当前节点的状态(LOOKING),进入下一轮选举,下面看下lead的执行流程。

1、载入数据

首先从zkDb中恢复数据(在启动阶段已经载入数据,理论上这里不会再次载入),解析出zxid

  1. zkDb中恢复数据(启动阶段已经读入数据,理论上这里不会再次读入)
  2. 解析zxid
  3. clean up dead session
  4. 做 snapshot

2、启动 LearnerCnxAcceptor

LearnerCnxAcceptor 主要作用是接受来自follower的请求,并为每一个follower 连接新建一个LearnerHandler 用于处理、同步follower,可参考6集群数据同步。

3、leader 获取选举周期

这一步稍微有点绕,通过Leader#getEpochToPropos实现,步骤如下:

  1. leader 会调用 Leader#getEpochToPropose 将自身加入到connectingFollowers 这个列表,在没有收到大多数的follower连接上之前,它进入等待状态
  2. 当一个follower连上leader之后,通过LearnerHandler 中调用Leader#getEpochToPropose将follower加入connectingFollowers 列表,并检查是否大多数follower已经连上leader,如大多数已经连上,则唤醒所有在这个节点等待的follower 和leader,否则它自己也在这里进入等待状态
  3. 等待的节点会定时去查看大多数follower都连接到leader

实现核心代码如下,可以看出这个通过wait、notify 实现的等待唤醒策略:

long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(waitingForNewEpoch && cur < end) {
    connectingFollowers.wait(end - cur);
    cur = Time.currentElapsedTime();
}

等待大部分follower都连接到leader之后,根据每个follower发送epoch 计算出一个新的epoch(选出最大的一个epoch并自增)。leader 会根据这个选举周期初始化这一轮zxid。

4、发送 LEADERINFO 数据包给Follower

当大部分的follower连接上leader后,阻塞等待的follower会被唤醒,接下来leader会发送 LEADERINFO(包括newZxid、version等信息)数据包给follower,然后等待大部分 follower 回复 ack 消息。等待策略和上面差不多,通过Leader#waitForEpochAck 实现。

注:如果follower 的版本小于 0x10000 那么就不会发送LEADERINFO 消息给follower,直接阻塞等待follower 回复的ack消息。在ack消息中也包含follower最新的zxid。现在follower除此回复的消息都是0x10000

这一步主要是将上一步选出来的epoch 广播给集群中的其他节点,其他节点后续会根据这个epoch判断收到的数据包是不是新的leader发送的,避免旧的leader复活之后广播之前的提议而造成状态不一致的问题。

5、Leader通过LearnerHandler异步同步数据给Follower

leader 在收到大多数follower回复ack消息之后,开始和follower 同步数据,在集群对外服务之前确保各个节点中的数据一致。通过LearnerHandler#syncFollower数据同步,同步方式有四种全量同步(snap)、差异化(diff)、回滚(trunc)、差异化+回滚(diff+trunc) ,具体采用哪种方式还要看follower 和 leader 之间的数据差异情况。

peerLastZxid:该Learner最后处理的ZXID
minCommittedLog:LeadercommittedLog中的最小ZXID
maxCommittedLog:LeadercommittedLog中的最大ZXID
lastProcessedZxid:leader中最后处理的ZXID
  • 强制发送snapshot(测试目的)

  • follower 和 leader 已经同步,则发送空的 diff 消息

    if (lastProcessedZxid == peerLastZxid) {       
     // 如果 follower 的peerLastZxid 和 leader lastProcessedZxid相等,说明两个节点的数据一致,不需要进行同步,这时只需要给follower 发送一个diff的包
     queueOpPacket(Leader.DIFF, peerLastZxid);
     needOpPacket = false;
     needSnap = false;
    } 
    
  • follower 的 txn 比leader 要多,那么则发送 TRUNC ,回滚follower多余的 txn数据。

    if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
             
     // follower的数据比leader 超前,则回滚follower的数据到leader#maxCommittedLog
     queueOpPacket(Leader.TRUNC, maxCommittedLog);
     currentZxid = maxCommittedLog;
     needOpPacket = false;
     needSnap = false;
    }
    
  • follower 在committedLog同步的范围内,那么根据follower 的zxid来决定发送 TRUNC 还是DIFF,如果follower 在同步中的话就发送空的DIFF

    if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
     // follower 数据在处于leader 的 最大最小事务之间,则增量同步 
     Iterator<Proposal> itr = db.getCommittedLog().iterator();
     
     // 这里有两种情形:
     // 1、发送 DIFF 给 follower : 对应 follower zxid 在leader的history中,这时只需要将follower中缺失的数据(propose+commit)发过去,然后让follower重做这些提案即可
     // 2、发送 TRUN + DIFF 给 follower : 对应 follower zxid 满足上述条件,但是follower中存在leader中不存在的数据,那么会先发送回滚消息,再增量同步。这种场景在leader 收到propose消息之后还没来得及广播给其他follower就挂掉,新选出的leader没有这个消息,所以需要将此消息回滚
     currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                          null, maxCommittedLog);
     needSnap = false;
    }
    
    
  • follower 和leader之间的数据差异巨大,follower的最大事务id小于leader 的minCommittedLog。会将leader 磁盘上的txnLog和committedLog同步给follower,如果失败了,会发送snapshot

    txnLogSyncEnabled :是否开启事务日志同步
    if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
     // follower 最大的zxid 小于 leader 最小 minCommittedLog,并且允许从txnLog中同步数据
    
     // 计算事务日志允许同步的大小
     long sizeLimit = db.calculateTxnLogSizeLimit();
     
     // 如果follower的zxid 在 leader的事务日志中,那么只需要同步事务日志中差异的部分,不需要同步整个snapshot,否则就需要同步snapshot
     Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
             peerLastZxid, sizeLimit);
     if (txnLogItr.hasNext()) {
         LOG.info("Use txnlog and committedLog for peer sid: " +  getSid());
         currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                              minCommittedLog, maxCommittedLog);
    
         Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
         currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                              null, maxCommittedLog);
         needSnap = false;
     }
     // closing the resources
     if (txnLogItr instanceof TxnLogProposalIterator) {
         TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
         txnProposalItr.close();
     }
    }
    

    注:如果需要同步整个镜像,同步过程就不会通过发送每个propose+commit数据给follower,而是leader将它自身的snapshot文件(SNAP)通过网络直接发给follower,follower收到后直接将本地数据覆盖即可。

经过上面的步骤,leader 和follower的差异数据同步完成之后,再次检查leader 和follower 在同步数据期间是否有其他的差异数据,所以最后一步就是同步这部分差异数据。

// 对这个方法的理解是,follower在故障恢复时,如果和leader的数据同步完成,还要再次检查同步数据这段时间是不是leader有了新的提交,需要再次同步
leaderLastZxid = leader.startForwarding(this, currentZxid);

在leader 和follower数据同步完成之后,leader 通过LearnerHandler发送 Leader.NEWLEADER 数据包给所有follower,然后leader通过 leader#waitForNewLeaderAck 阻塞等待大多数follower的ack数据。

6、启动LeaderZooKeeperServer

在大多数的follower都回复了newLeaderAck 数据包之后,这是的leader就成为真正的leader了,它启动LeaderZooKeeperServer ,履行leader 的职责,如下:

  1. 创建session追踪器(SessionTrackerImpl)
  2. 启动SessionTracker(它的主要作用是定期清理过期的session)
  3. 设置请求处理器(构造Leader的请求处理链)
  4. 注册JMX,然后将当前的服务的状态设置为RUNNING(运行)

所有的LearnerHandler 会阻塞等待LeaderZooKeeperServer 启动完成,之后它开始给所有follower发送Leader.UPTODATE 数据包,之后便处理来自follower数据。它可以收到如下四类消息:

  • Leader.ACK:follower在同步 proposal 之后发给leader的消息;leader收到此消息后直接通过Leader#processAck 处理这条消息。

    1、校验消息

    if (outstandingProposals.size() == 0) {
        return;
    }
    if (lastCommitted >= zxid) {
        // The proposal has already been committed
        return;
    }
    Proposal p = outstandingProposals.get(zxid);
    if (p == null) {
        return;
    }
    

    2、将收到的消息加入到ack桶中

    3、尝试提交消息(通过Leader#tryToCommit,如果没有收到大多数follower的回复时,提交失败)

    注:通过tryToCommit可知,在在提交消息时通过在outstandingProposals 中找当前消息的前一条消息,如果找到,说明上一条消息还没提交,则先不提交当前消息。否则提交当前消息,并且通知所有的Observer,最后通过将当前的请求转发给CommitProcessor 继续处理

  • Leader.PING:leader 和 follower之间的心跳数据。直接通过sessionTracker 更新当前session及超时时间

  • Leader.REVALIDATE:session校验是否有效

  • Leader.REQUEST:follower将客户端读以外的操作转发给leader,这一步Leader会通过LeaderZooKeeperServer#submitLearnerRequest将请求首先转发给PrepRequestProcessor。

7、和Follower建立心跳

在Leader 启动完成之后,leader变周期性的发送心跳数据(LearnerHandler#ping)给follower。如果出现follower 掉线或宕机就会将此follower从 learners 中移出,接下来就不会发送propose或者心跳数据给此follower。

当leader上没有大多数的follower时,会将自己关闭,状态重置为LOOKING,进行下一轮的选举。

1、超时时间判断

public synchronized boolean check(long time) {
    if (currentTime == 0) {
        return true;
    } else {
        long msDelay = (time - currentTime) / 1000000;
        return (msDelay < (leader.self.tickTime * leader.self.syncLimit));
    }
}
leader.self.tickTime * leader.self.syncLimit:超时时间

至此,Leader的启动流程就已经完成了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容

  • 什么是Zab协议 Zab 协议的作用 Zab 协议原理 Zab 协议核心 Zab 协议内容 原子广播 崩溃恢复 如...
    庙人阅读 576评论 0 1
  • 声明:本文写的时候,当时就是完全不懂zk,边看网上的文章边学习归纳和整理,这不是我的产出,不用点赞打赏。大家理智友...
    _Zy阅读 76,032评论 38 129
  • Apache Zookeeper是由Apache Hadoop的子项目发展而来,于2010年11月正式成为Apac...
    壹点零阅读 530评论 0 0
  • Apache Zookeeper是由Apache Hadoop的子项目发展而来,于2010年11月正式成为Apac...
    olostin阅读 6,111评论 2 9
  • Zookeeper依赖ZAB协议来实现分布式数据的一致性,其中ZAB协议包括原子广播和崩溃恢复两个阶段。 基于该协...
    liuzx32阅读 562评论 0 0