客户端连接
通过Zookeeper客户端类库连接org.apache.zookeeper.ZooKeeper
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 100*1000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("conn");
}
});
1、ZooKeeper类的创建
org.apache.zookeeper.ZooKeeper#ZooKeeper
通过createDefaultHostProvider创建静态服务提供类StaticHostProvider
2、ClientCnxn类的创建
org.apache.zookeeper.ZooKeeper#createConnection
其中最重要的是watchManager、sendThread、eventThread的初始化
3、ClientCnxn类的启动
ClientCnxn类的启动便是sendThread和eventThread线程的启动
在具体分析两个线程之前,需要了解pendingQueue和outgoingQueue队列表示的意义。
outgoingQueue:表示需要发送的数据包
pendingQueue:表示已经发送正在等待响应的数据包
SendThread
1、run
run方法内部有一个while方法,只要不为false将会一直运行
org.apache.zookeeper.ClientCnxn.SendThread#run
org.apache.zookeeper.ZooKeeper.States#isAlive
如果state状态不是CLOSED或AUTH_FAILED将会一直循环
2、建立socket连接
如果clientCnxnSocket客户端socket没有连接即sockKey = null,将会调用连接方法
org.apache.zookeeper.ClientCnxn.SendThread#startConnect
org.apache.zookeeper.ClientCnxnSocketNIO#connect
①、创建socket对象
org.apache.zookeeper.ClientCnxnSocketNIO#createSock
②、注册连接事件
org.apache.zookeeper.ClientCnxnSocketNIO#registerAndConnect
再尝试连接,如果immediateConnect返回true则会调用primeConnection方法。这里连接失败返回fasle
③、incomingBuffer和lenBuffer缓存重置
3、AuthFailed
把eventOfDeath添加到waitingEvents
4、发送心跳ping
org.apache.zookeeper.ClientCnxn.SendThread#sendPing
即是把ping包添加到outgoingQueue
5、只读模式,寻找读/写服务器
6、doTransport
org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
selector.select查询有哪些事件。如果没有事件便会阻塞waitTimeOut时间
6.1、连接事件准备就绪
如果是连接事件准备就绪,则会调用primeConnection方法
org.apache.zookeeper.ClientCnxn.SendThread#primeConnection
用来设置会话session,watches和authentication
①、watch
用来判断这些dataWatches、existWatches、childWatches、persistentWatches、persistentRecursiveWatches等watch是否为空
用来判断persistentWatches持久化的watch与persistentRecursiveWatches持久化的递归Watche是否为空。
最后只要任意watch不为空,都会添加到outgoingQueue队列中
②、authInfo
添加权限的authInfo请求数据包
③、添加空的数据包
④、更改感兴趣的事件类型
org.apache.zookeeper.ClientCnxnSocketNIO#doIO
6.2、处理读写事件
①、读事件(得到服务端响应数据)
首先读取4字节长度的数据到incomingBuffer
org.apache.zookeeper.ClientCnxnSocket#readLength
把incomingBuffer设置为还要读取的长度
最后通过readResponse读取响应结果
org.apache.zookeeper.ClientCnxn.SendThread#readResponse
包括PING_XID(pings)、AUTHPACKET_XID(AuthPacket)、WATCHER_EVENT(watch事件)
finishPacket
org.apache.zookeeper.ClientCnxn#finishPacket
org.apache.zookeeper.ClientCnxn.EventThread#queuePacket
②、写事件(发送数据到服务端)
a、findSendablePacket
从outgoingQueue队列中获取第一个并封装成Packet数据包,对于关于authentication权限的数据包,只发送header非空的数据
b、createBB缓存数据
org.apache.zookeeper.ClientCnxn.Packet#createBB
设置数据包Packet.ByteBuffer bb缓存数据
c、通过socker网络传输到服务端
对于发送过的数据并且不是ping或auth的数据包要放到pendingQueue队列中等待响应
d、禁止写数据
如果outgoingQueue是空或没有初始化将会禁止写数据(即禁止向服务端发送数据)
outgoingQueue队列什么时候添加
提交请求时,会把当前数据包添加outgoingQueue
1、getData
org.apache.zookeeper.ZooKeeper#getData
org.apache.zookeeper.ClientCnxn#submitRequest
org.apache.zookeeper.ClientCnxn#queuePacket
2、create
org.apache.zookeeper.ZooKeeper#create
也会调用到submitRequest方法
所以,只要最终调用到queuePacket方法都会添加,zookeeper客户端的增删改查等操作都会添加到outgoingQueue队列中
EventThread
从waitingEvents队列中获取响应数据,阻塞读取数据
org.apache.zookeeper.ClientCnxn.EventThread#run
org.apache.zookeeper.ClientCnxn.EventThread#processEvent
1、watcher
调用watcher.process方法
2、LocalCallback
调用异步执行方法AsyncCallback.processResult方法(LocalCallback事件)
3、response
根据response的类型调用异步执行方法
总结:
客户端的连接通过ZooKeeper创建,创建ZooKeeper类将会创建并启动ClientCnxn(客户端上下文类)。创建ClientCnxn类会引起sendThread、sendThread线程的初始化和启动。
SendThread:创建socket连接、获取命令数据发送给服务端(ping、auth、创建删除节点)、读取服务端响应数据
EventThread:从waitingEvents队列中获取数据。执行监听器watch事件、执行Packet数据包的异步调用