zookeeper源码启动流程详解

zookeeper启动类的位置在org.apache.zookeeper.server.ZooKeeperServerMain,没错,找到它,并运行Main方法,即可启动zookeeper服务器。

请注意,在笔者的环境中只启动了一个zookeeper服务器,所以它并不是一个集群环境。

一、加载配置

第一步就是要加载配置文件,我们来看initializeAndRun方法。

protected void initializeAndRun(String[] args)throws ConfigException, IOException{ServerConfig config = new ServerConfig();if(args.length == 1) {config.parse(args[0]);}else{config.parse(conf);}runFromConfig(config);}

这里主要就是把zoo.cfg中的配置加载到ServerConfig对象中,过程比较简单,不再赘述。我们先看几个简单的配置项含义。

配置含义

clientPort对外服务端口,一般2181

dataDir存储快照文件的目录,默认情况下,事务日志文件也会放在这

tickTimeZK中的一个时间单元。ZK中所有时间都是以这个时间单元为基础,进行整数倍配置

minSessionTimeout maxSessionTimeoutSession超时时间,默认是2tickTime  ~ 20tickTime 之间

preAllocSize预先开辟磁盘空间,用于后续写入事务日志,默认64M

snapCount每进行snapCount次事务日志输出后,触发一次快照,默认是100,000

maxClientCnxns最大并发客户端数,默认是60

二、启动服务

我们接着往下看,来到runFromConfig方法。

public void runFromConfig(ServerConfig config) throws IOException {LOG.info("Starting server");FileTxnSnapLog txnLog = null;try {final ZooKeeperServer zkServer = new ZooKeeperServer();final CountDownLatch shutdownLatch = new CountDownLatch(1);//注册服务器关闭事件zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));//操作事务日志和快照日志文件类txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir));txnLog.setServerStats(zkServer.serverStats());//设置配置属性zkServer.setTxnLogFactory(txnLog);zkServer.setTickTime(config.tickTime);zkServer.setMinSessionTimeout(config.minSessionTimeout);zkServer.setMaxSessionTimeout(config.maxSessionTimeout);//实例化ServerCnxnFactory抽象类cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns());cnxnFactory.startup(zkServer);shutdownLatch.await();shutdown();cnxnFactory.join();if(zkServer.canShutdown()) {zkServer.shutdown(true);}} catch (InterruptedException e) {LOG.warn("Server interrupted", e);} finally {if(txnLog != null) {txnLog.close();}}}

以上代码就是zookeeper服务器从启动到关闭的流程。我们拆分来看。

1、服务关闭事件

我们看到给zkServer注册了服务器关闭的处理类。

final ZooKeeperServer zkServer = new ZooKeeperServer();final CountDownLatch shutdownLatch = new CountDownLatch(1);zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));

首先,我们应该知道zookeeper服务器是有状态的。

protected enum State {INITIAL, RUNNING, SHUTDOWN, ERROR;}

那么,在状态发生变化的时候,就会调用到setState方法。

public class ZooKeeperServer{//当zookeeper服务器状态发生变化时候调用此方法protected voidsetState(State state) {this.state = state;if(zkShutdownHandler != null) {zkShutdownHandler.handle(state);}else{LOG.debug("ZKShutdownHandler is not registered, so ZooKeeper server "+"won't take any action on ERROR or SHUTDOWN server state changes");}}}

然后在这里就会调用到注册的处理器。在处理器中,如果发现状态不对,shutdownLatch.await方法就会被唤醒。

class ZooKeeperServerShutdownHandler {void handle(State state) {if(state == State.ERROR || state == State.SHUTDOWN) {            shutdownLatch.countDown();        }    }}

当它被唤醒,事情就变得简单了。关闭、清理各种资源。

2、日志文件

事务日志文件和快照文件的操作,分别对应着两个实现类,在这里就是为了创建文件路径和创建类实例。

public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);this.dataDir = new File(dataDir, version + VERSION);this.snapDir = new File(snapDir, version + VERSION);if(!this.dataDir.exists()) {if(!this.dataDir.mkdirs()) {throw new IOException("Unable to create data directory "+ this.dataDir);}}if(!this.dataDir.canWrite()) {throw new IOException("Cannot write to data directory "+ this.dataDir);}if(!this.snapDir.exists()) {if(!this.snapDir.mkdirs()) {throw new IOException("Unable to create snap directory "+ this.snapDir);}}if(!this.snapDir.canWrite()) {throw new IOException("Cannot write to snap directory "+ this.snapDir);}if(!this.dataDir.getPath().equals(this.snapDir.getPath())){checkLogDir();checkSnapDir();}txnLog = new FileTxnLog(this.dataDir);snapLog = new FileSnap(this.snapDir);}

上面的好理解,如果文件不存在就去创建,并检查是否拥有写入权限。

中间还有个判断很有意思,如果两个文件路径不相同,还要调用checkLogDir、checkSnapDir去检查。检查什么呢?就是不能放在一起。

事务日志文件目录下,不能包含快照文件。快照文件目录下,也不能包含事务日志文件。

最后,就是初始化两个实现类,把创建后的文件对象告诉它们。

3、启动服务

服务器的启动对应着两个实现:NIO服务器和Netty服务器。所以一开始要调用createFactory来选择实例化一个实现类。

static public ServerCnxnFactory createFactory() throws IOException {String serverCnxnFactoryName =System.getProperty("zookeeper.serverCnxnFactory");if(serverCnxnFactoryName == null) {serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();}try {ServerCnxnFactory serverCnxnFactory = Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();returnserverCnxnFactory;} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ serverCnxnFactoryName);ioe.initCause(e);throw ioe;}}

先获取zookeeper.serverCnxnFactory属性值,如果它为空,默认创建的就是NIOServerCnxnFactory实例。

所以,如果我们希望用Netty启动,就可以这样设置:System.setProperty("zookeeper.serverCnxnFactory", NettyServerCnxnFactory.class.getName());

最后通过反射获取它们的构造器并实例化。然后调用它们的方法来绑定端口,启动服务。两者差异不大,在这里咱们以Netty为例看一下。

构造函数

NettyServerCnxnFactory() {bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));bootstrap.setOption("reuseAddress",true);bootstrap.setOption("child.tcpNoDelay",true);bootstrap.setOption("child.soLinger", -1);bootstrap.getPipeline().addLast("servercnxnfactory", channelHandler);}

在构造函数中,初始化ServerBootstrap对象,设置TCP参数。我们重点关注的是,它的事件处理器channelHandler。

事件处理器

这里的channelHandler是一个内部类,继承自SimpleChannelHandler。它被标注为@Sharable,还是一个共享的处理器。

@Sharableclass CnxnChannelHandler extends SimpleChannelHandler {//客户端连接被关闭public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)throws Exception{//移除相应的ChannelallChannels.remove(ctx.getChannel());}//客户端连接public void channelConnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception{allChannels.add(ctx.getChannel());NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(),zkServer, NettyServerCnxnFactory.this);ctx.setAttachment(cnxn);addCnxn(cnxn);}//连接断开public void channelDisconnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception{NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();if(cnxn != null) {cnxn.close();}}//发生异常public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throws Exception{NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();if(cnxn != null) {if(LOG.isDebugEnabled()) {LOG.debug("Closing "+ cnxn);}cnxn.close();}}//有消息可读public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)throws Exception{try {//找到对应的NettyServerCnxn,调用方法处理请求信息NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();synchronized(cnxn) {processMessage(e, cnxn);}} catch(Exception ex) {LOG.error("Unexpected exception in receive", ex);throw ex;}}//处理消息private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {....省略}}

这里面就是处理各种IO事件。比如客户端连接、断开连接、可读消息...

我们看messageReceived方法。当有消息请求时,调用到此方法。它会找到当前Channel对应的NettyServerCnxn对象,调用其receiveMessage方法,来完成具体请求的处理。

绑定端口

初始化完成之后,通过bootstrap.bind来绑定端口,正式开始对外提供服务。

public class NettyServerCnxnFactory extends ServerCnxnFactory {public voidstart() {LOG.info("binding to port "+localAddress);parentChannel = bootstrap.bind(localAddress);}}

上面我们调用start方法启动了Netty服务,但整个zookeeper的启动过程还没有完成。

public void startup(ZooKeeperServer zks) throws IOException,InterruptedException {start();setZooKeeperServer(zks);zks.startdata();zks.startup();}

三、加载数据

接着我们看zks.startdata();它要从zookeeper数据库加载数据。

有的同学不禁有疑问,什么,zk竟然还有数据库?不着急,我们慢慢看。

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {//加载数据    public void startdata()throws IOException, InterruptedException {        //刚启动的时候,zkDb为空,先去初始化。if(zkDb == null) {            zkDb = new ZKDatabase(this.txnLogFactory);        }  //加载数据if(!zkDb.isInitialized()) {            loadData();        }    }}

上面的代码中,在刚启动的时候zkDb为空,所以会进入第一个条件判断,调用构造方法,初始化zkDb。之后,调用loadData方法加载数据。

1、ZKDatabase

事实上,zookeeper并没有数据库,有的只是ZKDatabase这个类,或者叫它内存数据库。  我们先看看它有哪些属性。

public class ZKDatabase {    //数据树    protected DataTree dataTree;//Session超时会话    protected ConcurrentHashMap sessionsWithTimeouts;//事务、快照Log    protected FileTxnSnapLog snapLog;//最小、最大事务ID    protected long minCommittedLog, maxCommittedLog;    public static final int commitLogCount = 500;    protected static int commitLogBuffer = 700;//事务日志列表,记录着提案信息    protected LinkedList committedLog = new LinkedList();    protected ReentrantReadWriteLocklogLock = new ReentrantReadWriteLock();//初始化标记    volatile private boolean initialized =false;}

这里面包括会话,数据树和提交日志。所有的数据都保存在DataTree中,它就是数据树,它保存所有的节点数据。

public class DataTree {//哈希表提供对数据节点的快速查找    private final ConcurrentHashMap nodes =        new ConcurrentHashMap();//Watcher相关    private final WatchManager dataWatches = new WatchManager();    private final WatchManager childWatches = new WatchManager();//zookeeper默认创建的节点    private static final String rootZookeeper ="/";    private static final String procZookeeper ="/zookeeper";    private static final String procChildZookeeper = procZookeeper.substring(1);    private static final String quotaZookeeper ="/zookeeper/quota";    private static final String quotaChildZookeeper = quotaZookeeper            .substring(procZookeeper.length() + 1);}

在我们从zookeeper上查询节点数据的时候,就是通过DataTree中的方法去获取。再具体就是通过节点名称去nodes哈希表去查询。比如:

public byte[] getData(String path, Statstat, Watcher watcher){DataNode n = nodes.get(path);if(n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {n.copyStat(stat);if(watcher != null) {dataWatches.addWatch(path, watcher);}returnn.data;}}

那我们也许已经想到了,DataNode才会保存数据的真正载体。

public class DataNode implements Record {    //父级节点    DataNode parent;//节点数据内容    byte data[];    //权限信息    Long acl;    //节点统计信息    public StatPersistedstat;//子节点集合    private Set children = null;//空Set对象    private static final Set EMPTY_SET = Collections.emptySet();}

在zookeeper中的一个节点就对应一个DataNode对象。它包含一个父级节点和子节点集合、权限信息、节点数据内容、统计信息,都在此类中表示。

2、实例化对象

我们接着回过头来,继续看代码。如果zkDb为空,就要去实例化它。

public ZKDatabase(FileTxnSnapLog snapLog) {dataTree = new DataTree();sessionsWithTimeouts = new ConcurrentHashMap();this.snapLog = snapLog;}

这里就是实例化DataTree对象,初始化超时会话的Map,赋值snapLog 对象。

那么在DataTree的构造函数中,初始化zookeeper默认的节点,就是往nodes哈希表中添加DataNode对象。

publicDataTree() {nodes.put("", root);nodes.put(rootZookeeper, root);root.addChild(procChildZookeeper);nodes.put(procZookeeper, procDataNode);procDataNode.addChild(quotaChildZookeeper);nodes.put(quotaZookeeper, quotaDataNode);}

3、加载数据

如果zkDb还没有被初始化,那就加载数据库,并设置为已初始化状态,然后清理一下过期Session。

public class ZooKeeperServer{public void loadData() throws IOException, InterruptedException {if(zkDb.isInitialized()){setZxid(zkDb.getDataTreeLastProcessedZxid());}else{setZxid(zkDb.loadDataBase());}//清理过期sessionLinkedList deadSessions = new LinkedList();for(Long session : zkDb.getSessions()) {if(zkDb.getSessionWithTimeOuts().get(session) == null) {deadSessions.add(session);}}zkDb.setDataTreeInit(true);for(long session : deadSessions) {killSession(session, zkDb.getDataTreeLastProcessedZxid());}}}

我们看zkDb.loadDataBase()方法。它将从磁盘文件中加载数据库。

public class ZKDatabase {//从磁盘文件中加载数据库,并返回最大事务IDpublic long loadDataBase() throws IOException {        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);        initialized =true;returnzxid;    }}

既然是磁盘文件,那么肯定就是快照文件和事务日志文件。snapLog.restore将证实这一点。

public class FileTxnSnapLog {public long restore(DataTree dt, Map sessions, PlayBackListener listener) throws IOException {//从快照文件中加载数据        snapLog.deserialize(dt, sessions);//从事务日志文件中加载数据        long fastForwardFromEdits = fastForwardFromEdits(dt, sessions, listener);returnfastForwardFromEdits;    }}

加载数据的过程看起来比较复杂,但核心就一点:从文件流中读取数据,转换成DataTree对象,放入zkDb中。在这里,咱们先不看解析文件的过程,就看看文件里存放的到底是些啥?

快照文件

我们找到org.apache.zookeeper.server.SnapshotFormatter,它可以帮我们输出快照文件内容。在main方法中,设置一下快照文件的路径,然后运行它。

public class SnapshotFormatter {public static void main(String[] args) throws Exception {//设置快照文件路径args = new String[1];args[0] ="E:\\zookeeper-data\\version-2\\snapshot.6";if(args.length != 1) {System.err.println("USAGE: SnapshotFormatter snapshot_file");System.exit(2);}new SnapshotFormatter().run(args[0]);}}

运行这个main方法,在控制台输出的就是快照文件内容。

ZNode Details (count=8):----/  cZxid = 0x00000000000000  ctime = Thu Jan 01 08:00:00 CST 1970  mZxid = 0x00000000000000  mtime = Thu Jan 01 08:00:00 CST 1970  pZxid = 0x00000000000002  cversion = 1  dataVersion = 0  aclVersion = 0  ephemeralOwner = 0x00000000000000  dataLength = 0----/zookeeper  cZxid = 0x00000000000000  ctime = Thu Jan 01 08:00:00 CST 1970  mZxid = 0x00000000000000  mtime = Thu Jan 01 08:00:00 CST 1970  pZxid = 0x00000000000000  cversion = 0  dataVersion = 0  aclVersion = 0  ephemeralOwner = 0x00000000000000  dataLength = 0----/zookeeper/quota  cZxid = 0x00000000000000  ctime = Thu Jan 01 08:00:00 CST 1970  mZxid = 0x00000000000000  mtime = Thu Jan 01 08:00:00 CST 1970  pZxid = 0x00000000000000  cversion = 0  dataVersion = 0  aclVersion = 0  ephemeralOwner = 0x00000000000000  dataLength = 0----/testcZxid = 0x00000000000002  ctime = Sat Feb 23 19:57:43 CST 2019  mZxid = 0x00000000000002  mtime = Sat Feb 23 19:57:43 CST 2019  pZxid = 0x00000000000005  cversion = 3  dataVersion = 0  aclVersion = 0  ephemeralOwner = 0x00000000000000  dataLength = 4----/test/t1  cZxid = 0x00000000000003  ctime = Sat Feb 23 19:57:53 CST 2019  mZxid = 0x00000000000003  mtime = Sat Feb 23 19:57:53 CST 2019  pZxid = 0x00000000000003  cversion = 0  dataVersion = 0  aclVersion = 0  ephemeralOwner = 0x00000000000000  dataLength = 4----/test/t2  cZxid = 0x00000000000004  ctime = Sat Feb 23 19:57:56 CST 2019  mZxid = 0x00000000000004  mtime = Sat Feb 23 19:57:56 CST 2019  pZxid = 0x00000000000004  cversion = 0  dataVersion = 0  aclVersion = 0  ephemeralOwner = 0x00000000000000  dataLength = 4----/test/t3  cZxid = 0x00000000000005  ctime = Sat Feb 23 19:57:58 CST 2019  mZxid = 0x00000000000005  mtime = Sat Feb 23 19:57:58 CST 2019  pZxid = 0x00000000000005  cversion = 0  dataVersion = 0  aclVersion = 0  ephemeralOwner = 0x00000000000000  dataLength = 4----Session Details (sid, timeout, ephemeralCount):0x10013d3939a0000, 99999, 00x10013d1adcb0000, 99999, 0

我们可以看到,格式化后的快照文件内容,除了开头的count信息和结尾的Session信息,中间每一行就是一个DataNode对象。从节点名称可以推算出自己的父级节点和子节点,其它的就是此节点的统计信息对象StatPersisted。

事务日志文件

我们找到org.apache.zookeeper.server.LogFormatter这个类,在main方法中设置事务日志的文件路径,然后运行它。在zookeeper中的每一个事务操作,都会被记录下来。

19-2-23 下午07时57分32秒 session 0x10013d1adcb0000 cxid 0x0 zxid 0x1 createSession 9999919-2-23 下午07时57分43秒 session 0x10013d1adcb0000 cxid 0x2 zxid 0x2 create'/test,#31323334,v{s{31,s{'world,'anyone}}},F,1

19-2-23 下午07时57分53秒 session 0x10013d1adcb0000 cxid 0x3 zxid 0x3 create '/test/t1,#31323334,v{s{31,s{'world,'anyone}}},F,119-2-23 下午07时57分56秒 session 0x10013d1adcb0000 cxid 0x4 zxid 0x4 create'/test/t2,#31323334,v{s{31,s{'world,'anyone}}},F,2

19-2-23 下午07时57分58秒 session 0x10013d1adcb0000 cxid 0x5 zxid 0x5 create '/test/t3,#31323334,v{s{31,s{'world,'anyone}}},F,319-2-23 下午07时58分51秒 session 0x10013d3939a0000 cxid 0x0 zxid 0x6 createSession 9999919-2-23 下午07时59分07秒 session 0x10013d3939a0000 cxid 0x4 zxid 0x7 create'/test/t4,#31323334,v{s{31,s{'world,'anyone}}},F,4

可以看到,每一个事务对应一行记录。包含操作时间、sessionId、事务ID、操作类型、节点名称和权限信息等。

需要注意的是,只有变更操作才会被记录到事务日志。所以,在这里我们看不到任何读取操作请求。

四、会话管理器

会话是Zookeeper中一个重要的抽象。保证请求有序、临时znode节点、监听点都和会话密切相关。Zookeeper服务器的一个重要任务就是跟踪并维护这些会话。

在zookeeper中,服务器要负责清理掉过期会话,而客户端要保持自己的活跃状态,只能依靠心跳信息或者一个新的读写请求。

而对于过期会话的管理,则依靠“分桶策略”来完成。具体情况是这样的:

1、zookeeper会为每个会话设置一个过期时间,我们称它为nextExpirationTime

2、将这个过期时间和相对应的Session集合放入Map中

3、开启线程,不断轮训这个Map,取出当前过期点nextExpirationTime的Session集合,然后关闭它们

4、未活跃的Session被关闭;正在活跃的Session会重新计算自己的过期时间,修改自己的过期时间nextExpirationTime,保证不会被线程扫描到

简而言之,还在活跃的Session依靠不断重置自己的nextExpirationTime时间,就不会被线程扫描到,继而被关闭。

接下来我们看调用到的zks.startup();方法,具体是怎么做的。

public class ZooKeeperServerpublic synchronized voidstartup() {if(sessionTracker == null) {createSessionTracker();}startSessionTracker();setupRequestProcessors();registerJMX();setState(State.RUNNING);notifyAll();}}

我们只关注createSessionTracker、startSessionTracker两个方法,它们和会话相关。

1、创建会话跟踪器

创建会话跟踪器,这里是一个SessionTrackerImpl对象实例。

protected voidcreateSessionTracker() {sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),tickTime, 1, getZooKeeperServerListener());}

在构造方法里,做了一些参数初始化的工作。

public SessionTrackerImpl(SessionExpirer expirer,ConcurrentHashMap sessionsWithTimeout, int tickTime,long sid, ZooKeeperServerListener listener){super("SessionTracker", listener);this.expirer = expirer;this.expirationInterval = tickTime;this.sessionsWithTimeout = sessionsWithTimeout;nextExpirationTime = roundToInterval(Time.currentElapsedTime());this.nextSessionId = initializeNextSession(sid);for(Entry e : sessionsWithTimeout.entrySet()) {addSession(e.getKey(), e.getValue());}}

我们重点关注下一个过期时间nextExpirationTime是怎样计算出来的。我们来看roundToInterval方法。

private long roundToInterval(long time) {return(time / expirationInterval + 1) * expirationInterval;}

其中,time是基于当前时间的一个时间戳;expirationInterval是我们配置文件中的tickTime。如果我们假定time=10,expirationInterval=2,那么上面计算后的下一个过期时间为(10/2+1)*2=12

这也就是说,当前的Session会被分配到Id为12的分桶中。我们继续往下看这一过程。在addSession方法中,先查询是否有会话Id的SessionImpl,没有则新建并存入。

synchronized public void addSession(long id, int sessionTimeout) {sessionsWithTimeout.put(id, sessionTimeout);//查询对应SessionId的Impl类if(sessionsById.get(id) == null) {SessionImpl s = new SessionImpl(id, sessionTimeout, 0);sessionsById.put(id, s);}else{if(LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,"SessionTrackerImpl --- Existing session 0x"+ Long.toHexString(id) +" "+ sessionTimeout);}}touchSession(id, sessionTimeout);}

最后调用touchSession来激活会话。需要注意的是,zookeeper中的每个请求都会调用到此方法。它来计算活跃Session的下一个过期时间,并迁移到不同桶中。

我们一直在说“分桶”,或许难以理解“桶”到底是个什么东西。在代码中,它其实是个HashSet对象。

public class SessionTrackerImpl{//过期时间和对应Session集合的映射HashMap sessionSets = new HashMap();//Session集合static class SessionSet {        HashSet sessions = new HashSet();    }synchronized public boolean touchSession(long sessionId, int timeout) {SessionImpl s = sessionsById.get(sessionId);//如果session被删除或者已经被标记为关闭状态if(s == null || s.isClosing()) {returnfalse;}//计算下一个过期时间long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);if(s.tickTime >= expireTime) {returntrue;}//获取Session当前的过期时间SessionSetset= sessionSets.get(s.tickTime);if(set!= null) {//从集合中删除set.sessions.remove(s);}//设置新的过期时间并加入到Session集合中s.tickTime = expireTime;set= sessionSets.get(s.tickTime);if(set== null) {set= new SessionSet();sessionSets.put(expireTime,set);}set.sessions.add(s);returntrue;}}

我们回头看上面那个公式,如果第一次Session请求计算后的过期时间为12。那么,对应Session的映射如下:12=org.apache.zookeeper.server.SessionTrackerImpl$SessionSet@25143a5e第二次请求,计算后的过期时间为15。就会变成:15=org.apache.zookeeper.server.SessionTrackerImpl$SessionSet@3045314d

同时,过期时间为12的记录被删除。这样,通过过期时间的变更,不断迁移这个Session的位置。我们就会想到,如果由于网络原因或者客户端假死,请求长时间未能到达服务器,那么对应Session的过期时间就不会发生变化。

**时代在变化,你不变,就会被抛弃。**这句话,同样适用于zookeeper中的会话。

我们接着看startSessionTracker();

protected voidstartSessionTracker() {((SessionTrackerImpl)sessionTracker).start();}

SessionTrackerImpl继承自ZooKeeperCriticalThread,所以它本身也是线程类。调用start方法后开启线程,我们看run方法。

synchronized public voidrun() {try {while(running) {currentTime = Time.currentElapsedTime();if(nextExpirationTime > currentTime) {this.wait(nextExpirationTime - currentTime);continue;}SessionSetset;//获取过期时间对应的Session集合set= sessionSets.remove(nextExpirationTime);//循环Session,关闭它们if(set!= null) {for(SessionImpl s : set.sessions) {setSessionClosing(s.sessionId);expirer.expire(s);}}nextExpirationTime += expirationInterval;}} catch (InterruptedException e) {handleException(this.getName(), e);}LOG.info("SessionTrackerImpl exited loop!");}

这个方法通过死循环的方式,不断获取过期时间对应的Session集合。简直就是发现一起,查处一起。这也就解释了为什么活跃Session必须要不断更改自己的过期时间,因为这里有人在监督。

最后就是注册了JMX,并设置服务器的运行状态。

五、总结

本文主要分析了zookeeper服务器启动的具体流程,我们再回顾一下。

配置 zoo.cfg文件,运行Main方法

注册zk服务器关闭事件,清理资源

选择NIO或者Netty服务器绑定端口,开启服务

初始化zkDB,加载磁盘文件到内存

创建会话管理器,监视过期会话并删除

注册JMX,设置zk服务状态为running

在此我向大家推荐一个架构学习交流群。交流学习群号:938837867 暗号:555 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容