前言
我在前面的文章Zookeeper单机版源码解析系列的解析zookeeper源代码,在前面介绍中,zookeeper网络通信层是基于NIO实现的,其实zookeeper还提供了对netty的支持,如果想使用netty作为zookeeper网络通信层的实现,需要在client和server端分别去指定
- Client端需要设置启动参数
-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
- Server端需要设置启动参数
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
Netty在Zookeeper客户端的使用
如果大家看过之前系列的代码,可能对下面这段代码有印象,这段代码是客户端创建session层连接表示对象时调用的方法
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
//获取客户端连接的实现类,我们通过设置指定为org.apache.zookeeper.ClientCnxnSocketNetty
String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
//通过反射生成客户端连接实例
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
.getDeclaredConstructor(ZKClientConfig.class);
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
return clientCxnSocket;
} catch (Exception e) {
throw new IOException("Couldn't instantiate " + clientCnxnSocketName, e);
}
}
ClientCnxnSocketNetty
ClientCnxnSocketNetty是zookeeper基于netty创建的session层连接表示对象
我们分析下ClientCnxnSocketNetty的创建
ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
this.clientConfig = clientConfig;
// Client only has 1 outgoing socket, so the event loop group only needs
// a single thread.
//客户端创建只包含一个执行线程的eventLoopGroup
eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(1 /* nThreads */);
initProperties();
}
SendThread.run会调用ClientCxnx.startConnect来创建客户端到服务端socket连接,真正创建连接的地方是ClientCnxnSocket.connect方法,对于ClientCnxnSocketNIO来说就是创建SocketChannel,然后SocketChannel根据指定的IP:Port去连接远程服务器,这个之前解析过。我们现在分析基于netty实现的ClientCnxnSocketNetty.connect
ClientCnxnSocketNetty.connect
void connect(InetSocketAddress addr) throws IOException {
firstConnect = new CountDownLatch(1);
//初始化客户端bootstrap,并设定客户端的handler为ZKClientPipelineFactory,
//ZKClientPipelineFactory继承了ChannelInitializer,下面我们会分析它的initChannel方法
Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup)
.channel(NettyUtils.nioOrEpollSocketChannel())
.option(ChannelOption.SO_LINGER, -1)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
//设置ByteBufAllocator
bootstrap = configureBootstrapAllocator(bootstrap);
bootstrap.validate();
//下面是执行连接到服务端的过程,先加锁,这把锁将来会在处理连接响应的时候用到
connectLock.lock();
try {
//异步的执行连接到服务端
connectFuture = bootstrap.connect(addr);
//添加连接结果回调
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// this lock guarantees that channel won't be assigned after cleanup().
boolean connected = false;
connectLock.lock();
try {
if (!channelFuture.isSuccess()) {
//连接不成功,直接返回
LOG.warn("future isn't success.", channelFuture.cause());
return;
} else if (connectFuture == null) {
LOG.info("connect attempt cancelled");
// If the connect attempt was cancelled but succeeded
// anyway, make sure to close the channel, otherwise
// we may leak a file descriptor.
channelFuture.channel().close();
return;
}
// setup channel, variables, connection, etc.
//获取建立的连接通道
channel = channelFuture.channel();
//设置disconnect状态为false,表示已经连接上
disconnected.set(false);
//设置initialized为false表示session会话还没有建立
initialized = false;
//lenBuffer,incomingBuffer和之前文章中分析的作用一样,用来读取服务端发送来的数据
lenBuffer.clear();
incomingBuffer = lenBuffer;
//primeConnection发送建立session会话请求(这个在NIO通信的系列文章中已经分析过),下面我们会解析这个请求是如何被发送出去的
sendThread.primeConnection();
updateNow();
updateLastSendAndHeard();
if (sendThread.tunnelAuthInProgress()) {
waitSasl.drainPermits();
needSasl.set(true);
sendPrimePacket();
} else {
needSasl.set(false);
}
connected = true;
} finally {
connectFuture = null;
//释放连接锁
connectLock.unlock();
if (connected) {
LOG.info("channel is connected: {}", channelFuture.channel());
}
// need to wake on connect success or failure to avoid
// timing out ClientCnxn.SendThread which may be
// blocked waiting for first connect in doTransport().
wakeupCnxn();
//完成连接计数释放
firstConnect.countDown();
}
}
});
} finally {
// 释放连接锁
connectLock.unlock();
}
}
通过对connect分析,可以看到这个方法完成了netty客户端bootstrap建立,客户端到服务端socket建立,创建session会话请求。因为connect方法是异步的,所以SendThread.run会继续执行到ClientCnxnSocketNetty.doTransport,之前NIO系列文章有解析过ClientCnxnSocketNIO.doTransport的源代码,下面我们解析ClientCnxnSocketNetty.doTransport的实现
ClientCnxnSocketNetty.doTransport
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
try {
//firstConnect等待连接完成
if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
Packet head = null;
if (needSasl.get()) {
if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
} else {
//从outgoingQueue取出一个请求packet
head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
}
// check if being waken up on closing.
if (!sendThread.getZkState().isAlive()) {
// adding back the packet to notify of failure in conLossPacket().
//如果服务端连接挂了或者连接还没有建立好,这个时候如果有请求packet被取到,那么直接使用addBack把该请求加入到outgoing的头部
addBack(head);
return;
}
// channel disconnection happened
if (disconnected.get()) {
addBack(head);
throw new EndOfStreamException("channel for sessionid 0x" + Long.toHexString(sessionId) + " is lost");
}
if (head != null) {
//把请求写到服务端
doWrite(pendingQueue, head, cnxn);
}
} finally {
updateNow();
}
}
不同于ClientCnxnSocketNIO.doTransport处理逻辑,在ClientCnxnSocketNetty中直接处理写数据的请求
ClientCnxnSocketNetty.doWrite
private void doWrite(Queue<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
updateNow();
boolean anyPacketsSent = false;
while (true) {
if (p != WakeupPacket.getInstance()) {
//如果不是wakeUp型的packet
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != ZooDefs.OpCode.ping)
&& (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
//设置packet的xid用来保证请求的顺序处理
p.requestHeader.setXid(cnxn.getXid());
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
//这个方法就是使用netty把数据写出去的入口
sendPktOnly(p);
anyPacketsSent = true;
}
//如果outgoingQueue中有数据那么一直去写,
if (outgoingQueue.isEmpty()) {
//outgoingQueue中的数据写完了,跳出doWrite
break;
}
p = outgoingQueue.remove();
}
// TODO: maybe we should flush in the loop above every N packets/bytes?
// But, how do we determine the right value for N ...
if (anyPacketsSent) {
//如果本次有数据被写出去,那么通过channel.flush把数据刷出到网络
channel.flush();
}
}
sendPktOnly是使用netty把数据写出去的入口,它最终会调用ClientCnxnSocketNetty.sendPkt
ClientCnxnSocketNetty.sendPkt
private ChannelFuture sendPkt(Packet p, boolean doFlush) {
// Assuming the packet will be sent out successfully. Because if it fails,
// the channel will close and clean up queues.
//把packet中的请求转化成ByteBuffer
p.createBB();
updateLastSend();
final ByteBuf writeBuffer = Unpooled.wrappedBuffer(p.bb)
//通过channel.writeAndFlush或者write把数据发送出去
final ChannelFuture result = doFlush ? channel.writeAndFlush(writeBuffer) : channel.write(writeBuffer);
result.addListener(onSendPktDoneListener);
return result;
}
通过上面的分析我们可以了解到客户端是如何使用netty把请求发送给服务端,接下来我分析客户端是如何使用netty读取来自服务端的响应数据,这需要先看下ZKClientPipelineFactory.initChannel方法
ZKClientPipelineFactory.initChannel
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (clientConfig.getBoolean(ZKClientConfig.SECURE_CLIENT)) {
initSSL(pipeline);
}
//zookeeper客户端通过initChannel向pipeline注册了一个ZKClientHandler,ZKClientHandler是一个inbound类型的handler
//没有注册任何outbound类型的handler,所以在上面讲解zookeeper是如何使用netty来发送请求到服务端的时候,我们没有先讲解ZKClientPipelineFactory.initChannel
pipeline.addLast("handler", new ZKClientHandler());
}
我们看下ZKClientHandler定义
ZKClientHandler
private class ZKClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
AtomicBoolean channelClosed = new AtomicBoolean(false);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//处理连接断开的情况
LOG.info("channel is disconnected: {}", ctx.channel());
cleanup();
}
/**
* netty handler has encountered problems. We are cleaning it up and tell outside to close
* the channel/connection.
*/
private void cleanup() {
//设置channelClosed为true
if (!channelClosed.compareAndSet(false, true)) {
return;
}
disconnected.set(true);
onClosing();
}
//channelRead0处理来自服务端的响应数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
updateNow();
//下面对请求的处理过程和NIO很像
while (buf.isReadable()) {
if (incomingBuffer.remaining() > buf.readableBytes()) {
int newLimit = incomingBuffer.position() + buf.readableBytes();
incomingBuffer.limit(newLimit);
}
//用incomingBuffer来读数据
buf.readBytes(incomingBuffer);
incomingBuffer.limit(incomingBuffer.capacity());
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
//incomingBuffer == lenBuffer说明这个时候读的是4个字节表示响应数据长度的数据
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
// initialized如果为false说明这个响应是建立session会话的响应
//readConnectResult之前在NIO系列文章中有解析
readConnectResult();
lenBuffer.clear();
incomingBuffer = lenBuffer;
initialized = true;
updateLastHeard();
} else {
//这个时候incomingBuffer包含的数据是请求的响应对象,
//sendThread.readResponse之前在NIO 系列文章中已经讲解过
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
wakeupCnxn();
// Note: SimpleChannelInboundHandler releases the ByteBuf for us
// so we don't need to do it.
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.error("Unexpected throwable", cause);
cleanup();
}
}
通过上面对ZKClientHandler的解析,相信大家对zookeeper如何使用netty读取服务端响应数据有了一定的了解。
到此我们就完成zookeeper客户端是如何使用netty来完成网络通信的讲解,下面我们来说说netty在zookeeper服务端的使用
Netty在zookeeper server端的使用
服务端在启动时候会创建ServerCnxnFactory,默认ServerCnxnFactory的实现是NIOServerCnxnFactory,下面我们看下NettyServerCnxnFactory的实现。
NettyServerCnxnFactory初始化
NettyServerCnxnFactory() {
x509Util = new ClientX509Util();
boolean usePortUnification = Boolean.getBoolean(PORT_UNIFICATION_KEY);
LOG.info("{}={}", PORT_UNIFICATION_KEY, usePortUnification);
if (usePortUnification) {
try {
QuorumPeerConfig.configureSSLAuth();
} catch (QuorumPeerConfig.ConfigException e) {
LOG.error("unable to set up SslAuthProvider, turning off client port unification", e);
usePortUnification = false;
}
}
this.shouldUsePortUnification = usePortUnification;
this.advancedFlowControlEnabled = Boolean.getBoolean(NETTY_ADVANCED_FLOW_CONTROL);
LOG.info("{} = {}", NETTY_ADVANCED_FLOW_CONTROL, this.advancedFlowControlEnabled);
setOutstandingHandshakeLimit(Integer.getInteger(OUTSTANDING_HANDSHAKE_LIMIT, -1));
//bossGroup处理客户端的连接请求,workerGroup负责每个连接IO事件的处理,典型的reactor模式
EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(NettyUtils.getClientReachableLocalInetAddressCount());
EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
//下面是创建服务端ServerBootstrap的典型模式没有什么好讲解的,我们直接看channelHandler的逻辑
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NettyUtils.nioOrEpollServerSocketChannel())
// parent channel options
.option(ChannelOption.SO_REUSEADDR, true)
// child channels options
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, -1)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (advancedFlowControlEnabled) {
pipeline.addLast(readIssuedTrackingHandler);
}
if (secure) {
initSSL(pipeline, false);
} else if (shouldUsePortUnification) {
initSSL(pipeline, true);
}
pipeline.addLast("servercnxnfactory", channelHandler);
}
});
this.bootstrap = configureBootstrapAllocator(bootstrap);
this.bootstrap.validate();
}
zookeeper 服务端的ServerBootstrap只包含一个业务处理handler: CnxnChannelHandler,CnxnChannelHandler是一个duplexHandler,下面我们分析下它的实现
CnxnChannelHandler定义
class CnxnChannelHandler extends ChannelDuplexHandler {
//当服务端接受客户端的socket连接之后,channelActive会被调用
//正常情况下通过channelActive服务端session层的连接表示对象会被建立起来
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel active {}", ctx.channel());
}
final Channel channel = ctx.channel();
//连接数有没有达到服务端设置的连接最大数,如果达到了,直接关闭底层socket,拒绝新的连接请求
if (limitTotalNumberOfCnxns()) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
channel.close();
return;
}
InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
//单个客户端的连接数是不是超过了用户设置的最大可建立连接数,如果达到了拒绝客户端的连接请求
if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
channel.close();
return;
}
//创建session会话层的连接表示对象NettyServerCnxn
NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
//把session会话连接表示对象存储到channel中
ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
if (handshakeThrottlingEnabled) {
// Favor to check and throttling even in dual mode which
// accepts both secure and insecure connections, since
// it's more efficient than throttling when we know it's
// a secure connection in DualModeSslHandler.
//
// From benchmark, this reduced around 15% reconnect time.
int outstandingHandshakesNum = outstandingHandshake.addAndGet(1);
if (outstandingHandshakesNum > outstandingHandshakeLimit) {
outstandingHandshake.addAndGet(-1);
channel.close();
ServerMetrics.getMetrics().TLS_HANDSHAKE_EXCEEDED.add(1);
} else {
cnxn.setHandshakeState(HandshakeState.STARTED);
}
}
if (secure) {
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
} else if (!shouldUsePortUnification) {
allChannels.add(ctx.channel());
addCnxn(cnxn);
}
}
//连接关闭时的处理逻辑
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel inactive {}", ctx.channel());
}
allChannels.remove(ctx.channel());
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
if (cnxn != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel inactive caused close {}", cnxn);
}
updateHandshakeCountIfStarted(cnxn);
//关闭连接
cnxn.close(ServerCnxn.DisconnectReason.CHANNEL_DISCONNECTED);
}
}
//异常处理方法
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.warn("Exception caught", cause);
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
if (cnxn != null) {
LOG.debug("Closing {}", cnxn);
updateHandshakeCountIfStarted(cnxn);
cnxn.close(ServerCnxn.DisconnectReason.CHANNEL_CLOSED_EXCEPTION);
}
}
//处理用自定义的channel事件:主要是处理channel读和不读的事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
try {
if (evt == NettyServerCnxn.ReadEvent.ENABLE) {
LOG.debug("Received ReadEvent.ENABLE");
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
// TODO: Not sure if cnxn can be null here. It becomes null if channelInactive()
// or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run
// after either of those. Check for null just to be safe ...
if (cnxn != null) {
if (cnxn.getQueuedReadableBytes() > 0) {
cnxn.processQueuedBuffer();
if (advancedFlowControlEnabled && cnxn.getQueuedReadableBytes() == 0) {
// trigger a read if we have consumed all
// backlog
ctx.read();
LOG.debug("Issued a read after queuedBuffer drained");
}
}
}
if (!advancedFlowControlEnabled) {
ctx.channel().config().setAutoRead(true);
}
} else if (evt == NettyServerCnxn.ReadEvent.DISABLE) {
LOG.debug("Received ReadEvent.DISABLE");
ctx.channel().config().setAutoRead(false);
}
} finally {
ReferenceCountUtil.release(evt);
}
}
//服务端读取客户端发送来的请求数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("message received called {}", msg);
}
try {
LOG.debug("New message {} from {}", msg, ctx.channel());
//对应于channelActive,从channel中获取session层连接表示对象NettyServerCnxn
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn == null) {
LOG.error("channelRead() on a closed or closing NettyServerCnxn");
} else {
//NettyServerCnxn处理客户端发送过来的请求
cnxn.processMessage((ByteBuf) msg);
}
} catch (Exception ex) {
LOG.error("Unexpected exception in receive", ex);
throw ex;
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (advancedFlowControlEnabled) {
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn != null && cnxn.getQueuedReadableBytes() == 0 && cnxn.readIssuedAfterReadComplete == 0) {
ctx.read();
LOG.debug("Issued a read since we do not have anything to consume after channelReadComplete");
}
}
ctx.fireChannelReadComplete();
}
// Use a single listener instance to reduce GC
// Note: this listener is only added when LOG.isTraceEnabled() is true,
// so it should not do any work other than trace logging.
private final GenericFutureListener<Future<Void>> onWriteCompletedTracer = (f) -> {
if (LOG.isTraceEnabled()) {
LOG.trace("write success: {}", f.isSuccess());
}
};
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (LOG.isTraceEnabled()) {
promise.addListener(onWriteCompletedTracer);
}
super.write(ctx, msg, promise);
}
}
现在我们看下NettyServerCnxn.processMessage是如何处理读入的请求信息
NettyServerCnxn.processMessage
void processMessage(ByteBuf buf) {
checkIsInEventLoop("processMessage");
LOG.debug("0x{} queuedBuffer: {}", Long.toHexString(sessionId), queuedBuffer);
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} buf {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(buf));
}
if (throttled.get()) {
LOG.debug("Received message while throttled");
// we are throttled, so we need to queue
if (queuedBuffer == null) {
LOG.debug("allocating queue");
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedDuplicate());
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
}
} else {
LOG.debug("not throttled");
if (queuedBuffer != null) {
appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
//上面的直接跳过,来到receiveMessage
receiveMessage(buf);
// Have to check !closingChannel, because an error in
// receiveMessage() could have led to close() being called.
if (!closingChannel && buf.isReadable()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Before copy {}", buf);
}
if (queuedBuffer == null) {
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
if (LOG.isTraceEnabled()) {
LOG.trace("Copy is {}", queuedBuffer);
LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
}
}
}
}
}
NettyServerCnxn.receiveMessage
NettyServerCnxn.receiveMessage是zookeeper server处理客户端请求的核心
private void receiveMessage(ByteBuf message) {
checkIsInEventLoop("receiveMessage");
try {
while (message.isReadable() && !throttled.get()) {
if (bb != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("0x{} bb {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() > message.readableBytes()) {
int newLimit = bb.position() + message.readableBytes();
bb.limit(newLimit);
}
//使用bb接受客户端发送来的请求对象
message.readBytes(bb);
bb.limit(bb.capacity());
if (LOG.isTraceEnabled()) {
LOG.trace("after readBytes message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("after readbytes 0x{} bb {}",
Long.toHexString(sessionId),
ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() == 0) {
bb.flip();
packetReceived(4 + bb.remaining());
ZooKeeperServer zks = this.zkServer;
if (zks == null || !zks.isRunning()) {
throw new IOException("ZK down");
}
if (initialized) {
//如果连接已经完成,那么使用zks.processPacket来处理这个请求,processPacket在NIO系列文章中已经解析过
// TODO: if zks.processPacket() is changed to take a ByteBuffer[],
// we could implement zero-copy queueing.
zks.processPacket(this, bb);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
//如果连接没有完成说明这个请求是一个session会话创建请求,那么使用zks.processConnectRequest来处理这个请求,
//zks.processConnectRequest在NIO系列文章中已经解析过
zks.processConnectRequest(this, bb);
initialized = true;
}
bb = null;
}
} else {
//代码执行到这里说明bb还没有被初始化,那么第一次需要从ByteBuf读取的数据是前4个字节表示的是请求的长度
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bblenrem {}", message.readableBytes(), bbLen.remaining());
ByteBuffer dat = bbLen.duplicate();
dat.flip();
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (message.readableBytes() < bbLen.remaining()) {
bbLen.limit(bbLen.position() + message.readableBytes());
}
//从message中读取前4个字节
message.readBytes(bbLen);
bbLen.limit(bbLen.capacity());
if (bbLen.remaining() == 0) {
bbLen.flip();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
}
//获取客户请求数据的长度
int len = bbLen.getInt();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen len is {}", Long.toHexString(sessionId), len);
}
bbLen.clear();
if (!initialized) {
if (checkFourLetterWord(channel, message, len)) {
return;
}
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
// checkRequestSize will throw IOException if request is rejected
zkServer.checkRequestSizeWhenReceivingMessage(len);
//创建容量为len的ByteBuffer,为接受客户请求数据做准备
bb = ByteBuffer.allocate(len);
}
}
}
} catch (IOException e) {
LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.IO_EXCEPTION);
} catch (ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.CLIENT_RATE_LIMIT);
}
}
通过receiveMessage我们知道了zookeeper是如何利用netty从客户端读取请求消息。
一个客户端请求经过zookeeper server请求处理链处理之后处理结果是如何被发送到客户端的呢?
请求的Response会在FinalRequestProcessor中形成然后被NettyServerCnxn.sendResponse发送出去
NettyServerCnxn.sendBuffer
sendResponse会调用sendBuffer,sendBuffer直接调用channel.writeAndFlush把请求响应发送给客户端
public void sendBuffer(ByteBuffer... buffers) {
if (buffers.length == 1 && buffers[0] == ServerCnxnFactory.closeConn) {
close(DisconnectReason.CLIENT_CLOSED_CONNECTION);
return;
}
channel.writeAndFlush(Unpooled.wrappedBuffer(buffers)).addListener(onSendBufferDoneListener);
}
上面就是zookeeper server端如何使用netty处理IO事件的过程