前言
在前面介绍了zookeeper server端的启动过程,现在我们分析zookeeper client启动过程
创建客户端连接对象
一般情况下使用zookeeper原生库创建建立的方式如下
Zookeeper zookeeper = new Zookeeper(connectionString,sessionTimeout,watcher)
我直接看Zookeeper类初始化的源代码
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
//clientConfig存储zookeeper客户端一些可配置属性的信息
this.clientConfig = clientConfig;
//创建客户端的watcher的管理器
watchManager = defaultWatchManager();
//设置watcher管理器默认的watcher
watchManager.defaultWatcher = watcher;
//根据用户提供的connectString穿件ConnectStringParser对象,下面会解析ConnectStringParser对象的作用
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
//用户提供的连接信息中可能包含多个ip地址,那么当客户端去连接zookeeper server的时候应该选择哪一个ip去连接呢?通过hostProvider来封装这个逻辑
hostProvider = aHostProvider;
//客户端的连接对象,这个对象是实现客户端连接服务端的核心,在下面我们会详细解析
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
//启动客户端相关的一些线程
cnxn.start();
}
ConnectStringParser
我们先解析下ConnectStringParser这个类
从上图我们可以看出ConnectStringParser类有两个重要的属性chrootPath,serverAddresses。ConnectStringParser就是根据用户传入的连接信息解析出这两个属性的值,chrootPath是用户传入的连接信息中包含的路径信息,比如用户提供的连接信息是"192.168.11.1:2181,192.168.11.2:2181/tt",那么通过ConnectStringParser的解析chrootPath=tt,serverAddresses用来存储用户提供的地址和端口对的解析结果,同样是上面的例子,serverAddresses解析的结果是["192.168.11.1:2181","192.168.11.2:2181"]
现在给出ConnectStringParser解析用户提供的连接信息的源代码
public ConnectStringParser(String connectString) {
// parse out chroot, if any
//取得chrootPath的分解符的位置
int off = connectString.indexOf('/');
if (off >= 0) {
//解析出chrootPath
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
//解析出客户端连接服务端的ip:port对信息
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
//通过逗号分隔符分割出每个ip:port的连接信息
List<String> hostsList = split(connectString, ",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
try {
//解析出ip和port
String[] hostAndPort = ConfigUtils.getHostAndPort(host);
host = hostAndPort[0];
if (hostAndPort.length == 2) {
port = Integer.parseInt(hostAndPort[1]);
}
} catch (ConfigException e) {
e.printStackTrace();
}
//根据ip和port创建InetSocketAddress对象,然后加入serverAddresses中
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
HostProvider
通过对ConnectStringParser的解析,我们知道用户可能会提供多个连接服务端的IP:Port,那么客户端应该选择哪一个去连接服务端呢?这个就是HostProvider的工作了。HostProvider默认实现是StaticHostProvider
上图是StaticHostProvider属性图,红框框出来的三个属性serverAddresses,lastIndex,currentIndex是StaticHostProvider实现选择一个服务器进行服务端连接的核心。StaticHostProvider的next()方法向外部提供了选取一个服务器进行连接的封装
next()
public InetSocketAddress next(long spinDelay) {
boolean needToSleep = false;
InetSocketAddress addr;
synchronized (this) {
//reconfigMode是zookeeper为了server端连接的负债均衡而设计的一个功能
if (reconfigMode) {
addr = nextHostInReconfigMode();
if (addr != null) {
currentIndex = serverAddresses.indexOf(addr);
return resolve(addr);
}
//tried all servers and couldn't connect
reconfigMode = false;
needToSleep = (spinDelay > 0);
}
//更新currentIndex
++currentIndex;
//如果currentIndex和服务器列表长度一样大,那么重置currentIndex为0
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
//从服务器列表中获取一个服务器
addr = serverAddresses.get(currentIndex);
//判断是不是需要sleep 一会
needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);
if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
//初始化lastIndex为0,如果一开始lastIndex就设置成0而不是-1那么会导致第一次连接时候needToSleep就是true,这样显然不合适
lastIndex = 0;
}
}
if (needToSleep) {
try {
//休眠一会会,当服务器列表中的所有的服务器都被连接一遍之后,再次去连接服务器的时候需要休眠一会(个人理解:既然所有的服务器连接都没连接上,那么服务端可能在忙着什么事情,等一会给服务器端一些喘息的机会)
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
//返回服务器ip信息
return resolve(addr);
}
ClientCnxn
客户端连接对象,保存了客户端连接服务端的信息,包含的属性比较多,我们选择几个进行注释
我们看下ClientCnxn最终的构造方法
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//创建SendThread线程,SendThread负责IO的处理,clientCnxnSocket在默认情况下的实现是ClientCnxnSocketNIO
sendThread = new SendThread(clientCnxnSocket);
//创建EventThread用来处理各种watcher关注的事件
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
ClientCnxn.start()
ClientCnxn.start会启动SendThread和EventThread
public void start() {
sendThread.start();
eventThread.start();
}
SendThread
SendThread负责处理客户端的所有IO,我们看下它的run方法
SendThread.run
public void run() {
//设置clientCnxnSocket的sessionId,outgoingQueue属性
//注意当第一次建立连接的时候由于服务端的sessionId还没有生成,所以为默认的0
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
//客户端向服务端发送心跳的频率,默认是10s,但是为了debug我把这个参数设置的很大
final int MAX_SEND_PING_INTERVAL = 10000000; //10 seconds
InetSocketAddress serverAddress = null;
//注意这个这里是while(xxx),如果连接正常,那么会一直执行下面的逻辑
while (state.isAlive()) {
try {
//如果客户端还没有和服务端建立连接,那么进入建立连接流程
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//通过hostProvider去服务器列表中获取一个服务进行连接
serverAddress = hostProvider.next(1000);
}
//建立到服务端的socket连接,下面会给出具体的源码
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
//如果已经建立了到服务端的连接
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
//统计连接的耗时
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
//如果to<0说明操作超时抛出异常
if (to <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
clientCnxnSocket.getIdleRecv(),
Long.toHexString(sessionId));
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//如果已经建立了到服务端的连接,下面是下一次发送心跳信息到服务端的时间点
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend()
- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
//如果timeTNextPing<=0说明发送的心跳的时间到了,亦或者客户端已经过了MAX_SEND_PING_INTERVAL这么久都没有发送任何消息到服务端,在上述两种情况下都需要发送心跳信息到服务端
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
//如果state == States.CONNECTEDREADONLY,看下面的英文解释
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//客户端处理各种IO事件,这个我们后面会详细解析
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
// closing so this is expected
LOG.warn(
"An exception was thrown while closing send thread for session 0x{}.",
Long.toHexString(getSessionId()),
e);
break;
} else {
LOG.warn(
"Session 0x{} for sever {}, Closing socket connection. "
+ "Attempting reconnect except it is a SessionExpiredException.",
Long.toHexString(getSessionId()),
serverAddress,
e);
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
}
}
}
//代码到这步,说明客户端和服务端的连接出现了异常
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
//向客户端事件处理线程发现服务端连接断开信息
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(
LOG,
ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
客户端到服务端的socket连接
客户端建立到服务端的socket发生在ClientCnxnSocket.connect中
void connect(InetSocketAddress addr) throws IOException {
//创建客户端socketChannel
SocketChannel sock = createSock();
try {
//socketChannel向selector注册OP_CONNECT时间,同时
//socketChannel向远程服务器发起连接请求
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to {}", addr);
sock.close();
throw e;
}
//连接初始化标识
initialized = false;
/*
* Reset incomingBuffer
*/
//zookeeper默认连接是基于NIO实现,通信消息流分成两个部分:消息长度和消息,消息又分成两个部分[消息头,消息体]
//消息长度部分固定为4个字节大小用来标识消息体的长度,lenBuffer就是用来表示消息长度的byteBuffer
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
上面客户端向服务端发起了连接请求,之后会执行到ClientCnxnSocketNIO.doTransport方法,这个方法是客户端处理IO信息的入口
ClientCnxnSocketNIO.doTransport
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
//等待注册监听事件的发生
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
//如果发生的是OP_CONNECT,那么完成socketChannel的连接
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
//完成了sessionId建立和认证等操作,我们在下面会详细解析
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//如果是发生的是IO读写事件执行doIO,在下面我们会详细解析
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
//如果发送队列outgoingQueue有数据那么向selector注册OP_WRITE监听
//多说一句,因为zookeeper NIO一次IO写出去的数据量有限制,所以在一次doIO完成后还需要判断outgoingQueue是不是还有数据要写,如果有那么就设置OP_WRITE监听
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
//清空事件
selected.clear();
}
当客户端和服务端建立起socket连接之后,紧接着就是session的建立,
sendThread.primeConnection完成了这一过程
sendThread.primeConnection
客户端在完成到服务端的socket连接建立之后,会向服务端发起建立session会话的请求,下面就是这一逻辑的实现
void primeConnection() throws IOException {
LOG.info(
"Socket connection established, initiating session, client: {}, server: {}",
clientCnxnSocket.getLocalSocketAddress(),
clientCnxnSocket.getRemoteSocketAddress());
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
//初始化创建会话请求
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
//下面很长的这段代码是客户端处理各种watcher发送到服务端的情况
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
List<String> persistentWatches = zooKeeper.getPersistentWatches();
List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
|| !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
|| persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
List<String> persistentWatchesBatch = new ArrayList<String>();
List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else if (persistentWatchesIter.hasNext()) {
watch = persistentWatchesIter.next();
persistentWatchesBatch.add(watch);
} else if (persistentRecursiveWatchesIter.hasNext()) {
watch = persistentRecursiveWatchesIter.next();
persistentRecursiveWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
Record record;
int opcode;
if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
// maintain compatibility with older servers - if no persistent/recursive watchers
// are used, use the old version of SetWatches
record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
opcode = OpCode.setWatches;
} else {
record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
opcode = OpCode.setWatches2;
}
//set watcher 请求的header
RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
//把请求头和请求体封装成Packet对象然后放入outgoingQueue中,等待发送
Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) {
//把客户端认证信息放入outgoingQueue中
outgoingQueue.addFirst(
new Packet(
new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
null,
new AuthPacket(0, id.scheme, id.data),
null,
null));
}
//最后把连接请求加入到outgoingQueue的头部,
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
//通过connectionPrimed向selector注册op_read和op_write事件
clientCnxnSocket.connectionPrimed();
LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}
上面的发送的连接请求就会触发ClientCnxnSocketNIO.doIO方法
ClientCnxnSocketNIO.doIO
客户端处理IO事件的方法,这个方法也是很长请大家耐心看完,我都耐心的分析完了,我想读者应该更有耐心读完
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
//从SelectionKey中获取SocketChannel
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
//如果是可读事件发生
if (sockKey.isReadable()) {
//从socket中读取消息
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
//数据读取完成,设置byteBuffer状态准备读
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
//incomingBuffer等于lenBuffer说读取的消息长度信息
recvCount.getAndIncrement();
//获取到了消息的长度,那么就初始化一个相应长度的bytebuffer,为读取消息做准备
readLength();
} else if (!initialized) {
//如果连接还没有初始化,说明session会话还没建立完成,
//那么通过readConnectResult来处理服务端发送来的ConnectResponse,下面我们会解析
readConnectResult();
//注册op_read事件
enableRead();
//下面同样是根据outgoingQueue的状态设置op_write信息
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
//下面是一些设置和清理操作
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//上面是读取消息长度的过程,下面就是读取消息体过程,
//readResponse下面会详细解析
sendThread.readResponse(incomingBuffer);
//重置lenBuffer
lenBuffer.clear();
//设置incomingBuffer = lenBuffer 为下次读取做准备
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
//下面是处理写事件的过程
if (sockKey.isWritable()) {
//从outgoingQueue中获取第一个待发送的消息
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
//如果请求不是ping和auth类型,那么客户端为了保证请求按照顺序处理,会在requestHeader中设置xid,xid在客户端按照自增的形式产生
p.requestHeader.setXid(cnxn.getXid());
}
//把请求消息对象转换成byteBuffer,下面会解析
p.createBB();
}
//把消息通过socket发送给服务端
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
//如果一个消息被一次性的发送了,那么从outgoingQueue把这个消息删除,如果一次write io操作没有把一个消息写完,那么这个消息会继续存在outgoingQueue中等待下一次write io 继续写出去
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
//如果发送的请求不是ping和auth类型的,那么这个请求需要等待服务端的response,把该请求放入pendingQueue中
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
//如果outgoingQueue中的所有消息都发送了,那么取消对op_write的监控
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
//简单来说就是在初始连接成功后但是很快session超时了,这个时候服务端会给客户端发送session超时事件同时关闭socket连接,如果与此同时客户端发送消息给服务端,会导致TCP的RST状态从而导致客户端收不到session 超时的消息。故而在连接没有完成的情况下initialized=false,客户端取消对op_write的监听
disableWrite();
} else {
// Just in case
//就像注释一样,以防万一,outgoingQueue还有数据继续注册op_write监听
enableWrite();
}
}
}
上面分析了doIO的逻辑,还有几个小点需要交待,我们先看下消息是如何转化成ByteBuffer的
我们先看消息对象
对于发送端来说,Packet类有两个属性,请求头requestHeader和请求体request
这个两个属性的数据会转化成ByteBuffer类型的bb,下面我们就分析这个过程
public void createBB() {
try {
//创建输出流
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
//消息流的前4个字节是全部消息的长度,但是目前还没确定,所以先初始化成-1
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
//requestHeader序列化到消息输出流中,requestHeader会序列化两个属性:客户端事物id(xid),请求类型码type,其实这个对象序列化结果也是固定长度[ 4(xid) + 4(type) = 8个字节 ]
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
//如果请求是创建会话连接,序列化ConnectRequest到消息输出流
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
//序列化请求到消息输出流
request.serialize(boa, "request");
}
baos.close();
//把消息输出流转化成ByteBuffer
this.bb = ByteBuffer.wrap(baos.toByteArray());
//设置消息的长度
this.bb.putInt(this.bb.capacity() - 4);
//为写做准备
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
zookeeper使用自带的JUTE作为序列化实现,有兴趣的可以去研究下
上面解析了消息发送时的结构,接来下我们分析客户端处理会话创建完成的response
SendThread.readConnectResult
void readConnectResult() throws IOException {
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b)).append(",");
}
buf.append("]");
if (LOG.isTraceEnabled()) {
LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
}
}
//通过ByteBuffer创建输入流
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
//反序列化ConnectResponse
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
//从反序列化的ConnectResponse对象中获得sessionId
this.sessionId = conRsp.getSessionId();
//向eventThread发送连接结果的通知
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
到此zookeeper 完成了客户端的启动,客户端启动包含了socket连接建立和session建立的过程,下图对上面过程做了简短的概述