Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建
本人从源码的角度来分析及讨论下Netty,抛砖引玉
原生的NIO服务端
public class NIOServer {
//通道管理器
private Selector selector;
public void initServer(int port) throws IOException {
// 获得一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置通道为非阻塞
serverChannel.configureBlocking(false);
// 将该通道对应的ServerSocket绑定到port端口
serverChannel.socket().bind(new InetSocketAddress(port));
// 获得一个通道管理器
this.selector = Selector.open();
//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
//当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.initServer(8000);
}
}
从原生的Nio启动逻辑来看
如果要实现Nio的服务端必须经历以下阶段:
Channel.open->configureBlocking(false)->bind->Selctor.open()->register
而Netty也不列外,但是他是怎么做的呢?
Netty的NIO服务端
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
启动状态图
Netty的线程模型就是多线程的Reactor模式,有MainReactor和SubReactor及Acceptor三种角色来组成其线程模型(下一章节将详细介绍此模型)
其中MainReactor和SubReactor都体现在EventLoopGroup这个线程组中,启动主要也是MainReactor绑定线程进行启动,本文以NIO为例详细说明启动状态图的一些细节,主要讲解MainReactor的角色定位及状态流转
NioEventLoopGroup的实现及扭转
NioEventLoopGroup在Bootstrap初始化时作为参数传入构造方法
其经历
1.AbstractBootstrap.bind(port)
2.AbstractBootstrap.initAndRegister()
3.ServerBootstrap.init(Channel channel)
4.NioEventLoopGroup.register(Channel channel, ChannelPromise promise)
其中2和3都是初始化整个Channel出来为后面的4阶段进行做准备的,而4阶段是将Channel与原生的Nio的ServerSocketChannel进行映射并实现Nio服务端的打开通道,设置非阻塞,在通道上设置事件选择器这个主要工作
所以主要启动逻辑在4阶段
跟随代码逻辑会发现最后会调用到AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)中,其主要代码如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//......省略无关代码
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// ...... 省略无关代码
}
}
}
eventLoop.inEventLoop()是判断当前线程是否为EventLoop线程, 此时当前线程还是我们的main线程, bossEventLoop线程还没有启动,所以会走到else分支调用eventLoop.execute(),在SingleThreadEventExecutor的execute方法中会判断当前线程是否为eventLoop如果不是, 则启动当前eventLoop线程,如下
@Override
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread(); //启动NioEventLoop线程
addTask(task); //将当前的注册任务添加到延迟队列中
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
所以主要是启动NioEventLoop线程
当走到此时,整个NioEventLoop线程已经启动,启动主要执行run()方法,如下:
protected void run() {
for (;;) {
// ......省略无关代码
if (selectedKeys != null) {
//执行IO任务
processSelectedKeysOptimized(selectedKeys.flip());
}
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //执行非IO任务,如注册通道、绑定端口等,与Io任务的时间轮占比是1:1,可配置
// .....省略无关代码
} catch (Throwable t) {
// .....省略无关代码
}
}
}
通过了这一步,整个Boss线程就已经启动起来,该线程主要执行了几类任务
- 典型的Io任务,由processSelectedKeysOptimized触发,本阶段不会触发
- 注册通道的任务,由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发
- 绑定端口的任务,由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发
- 通知通道注册成功的任务,也是由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发
其中2.3.4都是通过从延迟队列中拉取出来执行
2任务主要是通道注册并会触发3.4任务
private void register0(ChannelPromise promise) {
try {
doRegister();
registered = true;
promise.setSuccess();//触发回调来将绑定端口的任务提交到队列,3任务
pipeline.fireChannelRegistered();//通知注册成功,4任务
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
}
}
详细可以跟随启动状态图进行查看源码
在整个里面并没有对Netty与原生的Nio的流程怎么对应起来进行说明
- channel(NioServerSocketChannel.class)
设置成new ReflectiveChannelFactory<C>(NioServerSocketChannel.class)
通过反射创建一个NioServerSocketChannel对象,通过无参构造函数创建Nio
的
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
@Override
public T newChannel() {
try {
return clazz.newInstance(); //反射调用无参构造,如下
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
public NioServerSocketChannel() {
super(null, newSocket(), SelectionKey.OP_ACCEPT); //调用父类设置为无阻塞通道
config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
}
private static ServerSocketChannel newSocket() {
try {
return ServerSocketChannel.open();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
- AbstractNioChannel#doRegister先把监听事件注册为0, 在注册完之后的修改为OP_ACCEPT
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
//........
}
}
}
- DefaultChannelPipeline#fireChannelActive,注意以下read可不是一个读数据的 inbound event, 他是一个outbound event, 是"开始读"的意思,
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read(); //注意这个read可不是一个读数据的 inbound event, 他是一个outbound event, 是"开始读"的意思, 这个event在pipeline中从tail开始溜达最终会溜达到head的read()方法
}
return this;
}
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
protected void doBeginRead() throws Exception {
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) { // 这里把监听事件从0改为了ACCEPT
selectionKey.interestOps(interestOps | readInterestOp);
}
}