前言
之前的文章分析到了服务端NioServerSocketChannel的创建注册及注册accept事件。到现在为止,关于服务端,我们还有多个疑问未解开:例如当有客户端连接过来时,服务端要怎么处理,以及后续的读写如何进行的?之前的分析都是在服务端的父事件循环组中,那么子事件循环组又是怎么起作用的?之前提到了子处理器childHandler被封装在了ServerBootStrapAcceptor中添加到了流水线,那么子处理器又是怎么工作的,有没有被添加到pipeline中?本文将对这些问题进行解答。
服务端接收到连接后做了什么
我们不得不再次回到下面这个方法,当有read或者accept事件到来时,执行unsaf.read()(对于连接事件和read事件,因为其所对应的通道一个是NioServerSocketChannel一个是NioSocketChannel,两者的unsafe是不同的,对于连接事件,unsafe.read位于AbstractNioUnsafe中)。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
@Override
public void read() {
/*省略代码*/
try {
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
/*省略代码*/
} finally {
/*省略代码*/
}
}
/*进行连接的接收*/
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
在doReadMessages方法中,会通过NioServerSocketChannel内部的ServerSocketChannel来完成连接的接受,如果此时有连接进来,那么会生成SocketChannel实例,netty将其封装为自身的NioSocketChannel实例加入到缓冲buf中。随后会将缓冲区中内容(新接收的连接)通过fireChannelRead进行流水线传输。可以发现,NioServerSocketChannel中的ServerSocketChannel在接收到连接后会通过NioServerSocketChannel中的流水线的channelRead方法进行传输。
随后流水线channelRead执行到ServerBootstrapAcceptor。
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
由于我们已经知道传入的msg是一个netty已接收连接的通道(实际封装了SocketChannel的NioSocketChannel),获取到它的pipeline,并将childHandler添加到它的流水线中。如果我们给其配置的childHandler是ChannelInitializer的话,那么随后还会将其他的处理器添加到流水线中。同样的,新接收的连接还没有注册,所以也需要将其注册,只不过在server端,新接受的连接注册到childGroup,也就是子事件循环组中,最终它会被注册到子事件循环组中的某个NioEventLoop上,它之后的读写的事件操作就与之前分析客户端时相似。
在netty中NioServerSocketChannel负责接收连接,它注册在父NioEventLoopGroup中,而且他的通道的handler最后是一个ServerBootStrapAcceptor处理器。在接收到新连接时,出发fireChannelRead回调方法的执行,当执行到ServerBootStrapAcceptor时,ServerBootStrapAcceptor将子处理器加入到新连接的通道的流水线中。那么以Echo Server这个例子来看,两种通道的流水线分别是:
1 NioServerSocketChannel的流水线(在前文已经见到过)
2 新连接到来后NioSocketChannel的流水线
*链接
1. Netty解析:第一个demo——Echo Server
2. Netty解析:NioEventLoopGroup事件循环组
3. Netty解析:NioSocketChannel、NioServerSocketChannel的创建及注册
4. Netty解析:Handler、Pipeline大动脉及其在注册过程中体现
5. Netty解析:connect/bind方法背后
6. Netty解析:服务端如何接受连接并后续处理读写事件