上一节,我们研究了NioEventLoop的创建过程,其实也就是做了一些初始化,把该准备的准备好了。重点有两个
- 准备了一个
ThreadPerTaskExecutor
,以后添加的task
,每次执行的时候其实是实例化了了一个netty自定义的Thread :(FastThreadLocalThread
)然后再调用start()
执行任务。 - 准备了一个
NioEventLoop
数组(EventExecutor
),配套了一个循环使用该数组的选择器chooser
,该选择器根据"线程数是否是2的幂次方"提供了不同选择策略。 - 准备了
taskQueue
和tailQueue
用于存放eventLoop
任务,和外部线程任务
这一节,具体研究NioEventLoop
的的启动过程。
入口: 端口绑定的时候AbstractBootstrap#doBind0()
//入口
ChannelFuture future = serverBootstrap.bind(PORT).sync();
doBind(localAddress)
doBind0(regFuture, channel, localAddress, promise);
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
首先是channel.eventLoop()
获取的肯定是NioEventLoop
,但是具体是怎么获取的呢?往下研究
@Override
public NioEventLoop eventLoop() {
return (NioEventLoop) super.eventLoop();
}
@Override
public EventLoop eventLoop() {
EventLoop eventLoop = this.eventLoop;
if (eventLoop == null) {
throw new IllegalStateException("channel not registered to an event loop");
}
return eventLoop;
}
问题来了。该方法确实是获取的NioEventLoop
,但是是什么时候初始化AbstractChannel#eventLoop
这个成员变量的呢?翻看之前的《注册Selector》 过程的分析,发现确实是注册的时候做的。也就是AbstractChannel.AbstractUnsafe#register()
方法中,通过方法参数讲eventLoop
传入,以下这段代码
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//省略代码
AbstractChannel.this.eventLoop = eventLoop;
//省略代码
}
该方法被SingleThreadEventLoop#register()
调用并将自己传入的,其本身就是NioEventLoop
的父类,在创建的时候就被存储子啊EventExecutor[]
数组中了。那么是怎么从数组中取出来的呢?
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
前面也提到过,注册的过程我们发现从EventExecutor[]
中选择EventLoop
的代码是在MultithreadEventLoopGroup#register()
中
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
//io.netty.util.concurrent.MultithreadEventExecutorGroup#next()
@Override
public EventExecutor next() {
return chooser.next();
}
@UnstableApi
interface EventExecutorChooser {
/**
* Returns the new {@link EventExecutor} to use.
*/
EventExecutor next();
}
终于,在这里获取了之前创建NioEventLoop准备好的选择器。那么next()
方法是在两个选择器策略里面定义的,用于继续按下标后,循环获取EventExecutor[]
数组中的EventLoop
(这里是NioEventLoop
)
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
回过到启动过程,NioEventLoop
已经获取,调用execute()
方法,并且实例化了一个Runnable
,该Runnable就是一个新的task
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
而execute()
方法其实是在NioEventLoop
的父类,SingleThreadEventExecutor#execute()
。这里有三个动作。
-
inEventLoop()
:判断当前线程是否是EventLoop持有的线程 -
startThread()
:开启自定义线程 -
addTask(task)
:添加到任务队列
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//判断当前线程是否是EventLoop持有的线程,这个时间点返回的是false
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
//不在同一线程,开启自定义线程
startThread();
//添加任务队列
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
-
inEventLoop()
很多地方都用到,值得深究,追综到SingleThreadEventExecutor#inEventLoop()
,确实是跟持有的成员变量private volatile Thread thread;
进行对比。注意,该成员变量this.thread
此时并没有被初始化,因此为null,返回的是false
。可以Debug进行跟踪。
@Override
public boolean inEventLoop() {
//传入当前线程,当前线程为:main 线程
return inEventLoop(Thread.currentThread());
}
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
-
startThread()
开启一个线程用于执行NioEventLoop
的run
方法,并且让NioEventLoop
持有这个线程。这一刻,NioEventLoop
跑起来了。
private void startThread() {
//判断当前线程是否是未启动的
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
//启动
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
//ThreadPerTaskExecutor#execute() 开启netty自定义的线程FastThreadLocalThread,并执行
executor.execute(new Runnable() {
@Override
public void run() {
//运行时,开始持有运行时线程
thread = Thread.currentThread();
//省略代码
//执行NioEventLoop.run()
SingleThreadEventExecutor.this.run();
//省略代码
});
}
-
addTask(task)
是在NioEventLoop
的线程跑起来了之后,将最初绑定端口的任务offer()
到了taskQueue
,存储起来,异步执行。
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}