导读
原创文章,转载请注明出处。
本文源码地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:带注释的netty源码
本文
本文简要地介绍服务端新连接的接入过程,逻辑比较简单,主要功能在NioMessageUnsafe
和ServerBootsrapAcceptor
中。
1 服务端的启动过程
1.1 两个EventLoopGroup
我们在“服务端的启动过程”这篇文章中看到过ServerBootstrap
在启动时需要传入两个EventLoop
,一个叫bossGroup
,一个叫workerGroup
,bossGroup
处理监听端口的Channel
,而workerGroup
为新建立的连接提供服务,具体如何提供服务呢,咱们往下看。
/**
* 欢迎关注公众号“种代码“,获取博主微信深入交流
*
* @author wangjianxin
*/
public class com.zhongdaima.netty.analysis.bootstrap.ServerBoot {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.attr(AttributeKey.valueOf("ChannelName"), "ServerChannel")
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelRegistered");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelActive");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("HandlerAdded");
}
}).childHandler(new ChannelInboundHandlerAdapter(){
});
ChannelFuture f = b.bind(8000).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1.2 ServerBootstrapAcceptor
在ServerBootstrap
的init
方法中为NioServerSocketChannel
添加了一个特殊的Handler
,就是ServerBootstrapAcceptor
,这个Handler
就是服务端新连接接入过程中的主要逻辑所在了。
void init(Channel channel) throws Exception {
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
2 新连接接入
我们在“EventLoop
的工作原理”这篇文章中提到中过一个特殊的事件OP_ACCEPT
,并且比较奇怪的是,在发生OP_ACCEPT
兴趣事件时,调用了unsafe.read
方法。今天咱们就从这里开始跟踪一下服务端新连接接入的过程。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
//OP_ACCEPT事件却调用unsafe.read()方法
//参考io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read和io.netty.channel.socket.nio.NioServerSocketChannel.doReadMessages
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
NioServerSocketChannel
继承了AbstractNioMessageChannel
,所以,此处调用的unsafe.read()
实现在NioMessageUnsafe
中,该类中有一个readBuf
属性,比较特殊的是这个readBuf
中的泛型不是ByteBuf
而是Object
,为什么呢,咱们接着往下看,到doReadMessages
这里传入了这个readBuf
属性。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
} finally {
}
}
}
跟到doReadMessages
方法去看看,这里调用javaChannel.accept()
方法拿到了一个SocketChannel
,随后封装成NioSocketChannel
并加入到入参buf
(即是前面提到的AbstractNioMessageChannel.NioMessageUnsafe#readBuf
)中。
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
}
return 0;
}
咱们接着回到NioMessageUnsafe
的read
方法中,在调用完doReadMessages
方法之后又调用了pipeline.fireChannelRead
方法,参数就是doReadMessages
中所加入的NioSocketChannel
。
在pipeline
中的传播过程咱们不再赘述,因为我们刚刚分析过pipeline
的工作原理,最终这个事件会传播到ServerBootstrapAcceptor
中。我们来看一下ServerBootsrapAcceptor
的channelRead
方法。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e : childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e : childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
ServerBootsrapAcceptor
的channelRead
方法中首先为新连接添加handler
,设置连接参数、连接属性,最后最为关键的一步就是将连接注册到childGroup
中,这里的childGroup
就是引导代码中的wokerGroup
,注册过程我们在“服务端启动过程”和“客户端启动过程”中都详细讲过,此处不再赘述。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//这里的msg实际上是NioSocketChannel,请参考io.netty.channel.nio.NioEventLoop.processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法
final Channel child = (Channel) msg;
//为新连接添加handler
child.pipeline().addLast(childHandler);
//为新连接设置连接参数
for (Entry<ChannelOption<?>, Object> e : childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
//为新连接设置属性
for (Entry<AttributeKey<?>, Object> e : childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
//将新连接注册到workerGroup上
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
3 总结
服务端新连接接入的逻辑比较简单,在ServerSocketChannel
上发生OP_ACCEPT
事件后,触发ChannelRead
事件,ChannelRead
事件传播过程中携带的参数就是新接入的连接,事件传播至ServerBootsrapAcceptor
后被注册到workerGroup
。
关于作者
王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。