- boss线程执行NioSocketServerChannel的Selector,监听OP_ACCEPT事件
- ChannelPipeline使用
- JDK nio channel 的regiter和bind
- Channel handler的handlerAdded、channelRegistered、channelActive
示例代码
以Rocket MQ一段代码为例
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 2088)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
1. ServerBootstrap
1.1 AbstractBootstrap.bind()
public ChannelFuture bind() {
// 校验group != null
// ChannelFactory != null
logger.info("校验group != null 和 ChannelFactory != null");
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
- 初始化NioServerSocketChannel
- 设置NIO socket参数
- ChannelPipeline中添加ServerBootstrapAcceptor Handler
- boss线程执行JDK channel register方法
- 如果boss线程执行结束, main线程直接执行JDK channel bind方法
否则添加ChannelFuture listener, 执行结束register方法,在执行JDK channel bind方法
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. 初始化NioServerSocketChannel
// 2. 设置nio socket 参数
// 3. DefaultChannelPipeline添加ChannelHandler {
// public void run() {
// pipeline.addLast(new ServerBootstrapAcceptor(
// ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
// }
// }
final ChannelFuture regFuture = initAndRegister();
// NioServerSocketChannel
final Channel channel = regFuture.channel();
// 失败直接返回
if (regFuture.cause() != null) {
return regFuture;
}
// 执行结束, 比较快main线程执行dobind0方法
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
// DefaultChannelPromise
ChannelPromise promise = channel.newPromise();
System.err.println("run thread: " + Thread.currentThread().getName() + " boss线程执行channel register结束, 执行doBind0");
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
// boss线程没有执行结束, 添加执行结束listener
else {
System.err.println("run thread: " + Thread.currentThread().getName() + " boss线程没有执行channel register结束, 添加执行结束listener");
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
private String creator = Thread.currentThread().getName();
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
System.out.println("创建线程: " + creator + ", 执行线程: " + Thread.currentThread().getName() + ", 执行doBind0");
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// ReflectiveChannelFactory
// 反射默认构造方法创建NioServerSocketChannel
channel = channelFactory.newChannel();
// 1. 设置nio socket 参数
// 2. DefaultChannelPipeline添加ChannelHandler {
// public void run() {
// pipeline.addLast(new ServerBootstrapAcceptor(
// ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
// }
// }
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
// javaChannel().close();
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// EventLoopGroup(boss event loop group)选择EventLoop执行
// SingleThreadEventLoop.register(channel)
// regFuture = DefaultChannelPromise
// NioMessageUnsafe.register(EventLoop, ChannelPromise);
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
// 如果javaChannel已经register
// 关闭javaChannel
// 并且清空write的ChannelOutboundBuffer缓存
if (channel.isRegistered()) {
channel.close();
}
// 关闭javaChannel
else {
// javaChannel().close();
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
1.1.1 NioServerSocketChannel
- 继承AbstractNioMessageChannel
- config = NioServerSocketChannelConfig
- unsafe = NioMessageUnsafe
- pipeline = DefaultChannelPipeline
- 其他属性: JDK nio server channel、nio事件SelectionKey.OP_ACCEPT
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
// SelectorProvider
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
private final ServerSocketChannelConfig config;
/**
* Create a new instance
*/
public NioServerSocketChannel() {
// private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected ChannelId newId() {
return DefaultChannelId.newInstance();
}
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
1.1.2 EventLoopGroup.register(Channel)
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// NioMessageUnsafe
Channel.Unsafe unsafe = promise.channel().unsafe();
System.out.println("event loop register exe "
+ unsafe.getClass().getName() + ".register(this, " + promise.getClass().getName() + ")");
unsafe.register(this, promise);
return promise;
}
1.1.2.1 Unsafe.register(EventLoop eventLoop, final ChannelPromise promise)
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 判断此unsafe是否已经注册过channel
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// return loop instanceof NioEventLoop;
// 是否是NioEventLoop
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// 是否在event loop线程执行
if (eventLoop.inEventLoop()) {
// 直接执行
register0(promise);
}
// 启动event loop线程执行
else {
try {
eventLoop.execute(new NamedRunnable() {
@Override
public String name() {
return "AbstractChannel[489] > register0(promise) ";
}
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
- javaChannel().register
- 执行channel handler的handlerAdded方法
ChannelInitializer类型的handler执行完handlerAdded方法会从handler链中删除自己
其他类型handler不会 - ChannelPromise sucess, 后续执行javaChannel().bind方法
- 执行channel handler的channelRegister方法
- isActive() 多余判断, 因为bind方法也是提交到boss event loop的任务队列里面
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
// promise 是否被取消
// java channel是否是open状态
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 注册标记
boolean firstRegistration = neverRegistered;
// javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 执行DefaultChannelPipeline中
// PendingHandlerCallback包装的ChannelHander的handlerAdded方法
// 如果ChannelHander extends ChannelInitializer执行handlerAdded
// 后会从DefaultChannelPipeline移除此ChannelHander
pipeline.invokeHandlerAddedIfNeeded();
// 设置成功, notify 注册的listener
safeSetSuccess(promise);
// 从head开始执行DefaultChannelPipeline的ChannelHandler链
// 此示例: 执行完handlerAdded后方法后, 仅剩下默认的HeadContext -> TailContext
// 添加ServerBootstrapAcceptor还在任务队列没有执行
// 这两个ChannlHandler, 没有任何操作
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// javaChannel.socket().isBound()
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
- 向boss event loop的JDK Selector注册, 但未设置监听事件
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 注册, event loop的Selector(JDK Selector)
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
1.1.2.2 DefaultChannelPipeline.invokeHandlerAddedIfNeeded()
- 在ServerBootstrap.init方法中, 添加了new ChannelInitializer channel handler
-
当前已经存在3个handler
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
// 每个channel只执行一次
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
// pipeline自己重新设置一次所属的channel register
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
// 为空, 等待GC
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
// 也是个链表, 遍历执行
// 都是在JDK channel没有register之前添加的handler
// PendingHandlerAddedTask 执行handler的handlerAdded
// PendingHandlerRemovedTask 执行handler的handlerRemoved
while (task != null) {
task.execute();
task = task.next;
}
}
- 此示例没有remove handler, 只有添加
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
remove0(ctx);
ctx.setRemoved();
}
}
}
}
- handler为ServerBootStrap.init方法添加的ChannelInitializer
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
ctx.setAddComplete();
System.out.println("执行" + ctx.handler() + " 添加成功handlerAdded 方法");
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
boolean removed = false;
try {
// 从handler 链中移除此handler
remove0(ctx);
try {
// 执行handler的handlerRemoved方法
ctx.handler().handlerRemoved(ctx);
} finally {
// 设置状态为移除状态
ctx.setRemoved();
}
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
// 执行handler(不存在当前异常handler, 之前已经从链表中移除了)的exceptionCaught方法
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
1.1.2.3 ChannelInitializer
- handlerAdded
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
initChannel(ctx);
}
}
- finally会删除此channel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
}
// 会删除此channel
finally {
remove(ctx);
}
return true;
}
return false;
}
initChannel((C) ctx.channel()); 就是ServerBootstrap.init中的ChannelInitializer
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
1.2 init
- ChannelPipeline添加channel handler
- currentChildGroup 为 worker
- 添加的channel handler主要是继续添加ServerBootstrapAcceptor
void init(Channel channel) throws Exception {
// 设置nio socket 参数
// .option(ChannelOption.SO_BACKLOG, 1024)
// .option(ChannelOption.SO_REUSEADDR, true)
// .option(ChannelOption.SO_KEEPALIVE, false)
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// {} 此示例为空
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// DefaultChannelPipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
// new ChannelInitializer() {}
// worker 处理流程handler 自定义
// .childHandler(new ChannelInitializer<SocketChannel>() {
// @Override
// public void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline()
// .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
// new HandshakeHandler(TlsSystemConfig.tlsMode))
// .addLast(defaultEventExecutorGroup,
// new NettyEncoder(),
// new NettyDecoder(),
// new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
// new NettyConnectManageHandler(),
// new NettyServerHandler()
// );
// }
// });
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
logger.info("DefaultChannelPipeline.addLast添加 ChannelInitializer");
// 向DefaultChannelPipeline中添加handler
// 当javaChannel().register(...) 后
// 会调用ChannelInitializer.handlerAdded方法
// if (ctx.channel().isRegistered()) {
// initChannel(ctx);
// }
// 如果javaChannel没有register, 则会PendingHandlerCallback延迟在chnnel register后执行
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new NamedRunnable() {
@Override
public String name() {
return "ServerBootstrap[211] > pipeline.addLast(ServerBootstrapAcceptor)";
}
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
1.2.1 DefaultChannelPipeline.addLast(ChannelHandler... handlers)
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// ChannelHandlerAdapter类型handler校验
// if (!h.isSharable() && h.added) { throw new ChannelPipelineException() }
checkMultiplicity(handler);
// 生成handler name, 不指定name就生成
// 存在相同name的handler就抛出异常
String handlerName = filterName(name, handler);
// 封装在DefaultChannelHandlerContext
newCtx = newContext(group, handlerName, handler);
// 链表维护
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// 如果channel没有register
if (!registered) {
// 设置状态为ADD_PENDING
newCtx.setAddPending();
// 添加pendingHandlerCallbackHead(是一个单向链表), 在channel register后执行
// 添加pendingHandlerCallbackHead = PendingHandlerAddedTask;
// 移除pendingHandlerCallbackHead = PendingHandlerRemovedTask
// 执行handler的handlerAdded方法
callHandlerCallbackLater(newCtx, true);
return this;
}
// 不存在EventExecutorGroup, 使用AbstractChannelHandlerContext -> DefaultChannelPipeline -> channel的EventLoop
EventExecutor executor = newCtx.executor();
// 当前线程和event loop线程, 不是同一线程
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new NamedRunnable() {
@Override
public String name() {
return "DefaultChannelPipeline[245] > " + newCtx.handler() + ".callHandlerAdded0";
}
@Override
public void run() {
// 执行handler的handlerAdded方法
callHandlerAdded0(newCtx);
}
});
return this;
}
}
// 执行handler的handlerAdded方法
callHandlerAdded0(newCtx);
return this;
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
ctx.setAddComplete();
System.out.println("执行" + ctx.handler() + " 添加成功handlerAdded 方法");
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
boolean removed = false;
try {
// 从handler 链中移除此handler
remove0(ctx);
try {
// 执行handler的handlerRemoved方法
ctx.handler().handlerRemoved(ctx);
} finally {
// 设置状态为移除状态
ctx.setRemoved();
}
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
// 执行handler(不存在当前异常handler, 之前已经从链表中移除了)的exceptionCaught方法
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
1.3 doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// boss event loop添加任务
channel.eventLoop().execute(new NamedRunnable() {
@Override
public String name() {
return "AbstractBootstrap[412] > channel.bind(SocketAddress, ChannelPromise)";
}
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
1.4 Channel.bin(SocketAddress, ChannelPromise)
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
1.4.1 DefaultChannelPipeline.bind(SocketAddress, ChannelPromise)
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
1.4.2 TailContext.bind(SocketAddress, ChannelPromise )
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// 一堆promise的校验
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// 返回handler链第一个outbound channel handler
// next = AbstractChannelHandlerContext(HeadContext)
final AbstractChannelHandlerContext next = findContextOutbound();
// 获得handler的event loop
// HeadContext executor为空, 返回
// DefaultChannelPipeline -> channel(NioServerSocketChannel)的EventExecutor(boss event loop)
EventExecutor executor = next.executor();
// 当前执行线程和executor loop线程为同一个, 直接执行
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
}
// 否则添加到executor的任务队列中
else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
1.4.3 HeadContext.bind(SocketAddress, ChannelPromise )
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
1.4.4 Unsafe.bind(SocketAddress, ChannelPromise )
- 执行handler channelActive, server开始等待accept连接
- HeadContext、ServerBootstrapAcceptor、TailContext
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
// ChannelPromise被取消 || channel已经关闭
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
// UDP 广播, 说不好
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// javaChannel().socket().isBound();
boolean wasActive = isActive();
try {
// javaChannel().bind(localAddress, config.getBacklog());
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 如果bind成功(javaChannel().socket().isBound())
// 执行handler链的channelActive方法
if (!wasActive && isActive()) {
invokeLater(new NamedRunnable() {
@Override
public String name() {
return "[AbstractChannel.599]: pipeline.fireChannelActive()";
}
@Override
public void run() {
// TODO DEBUG LOG
AbstractChannelHandlerContext head = pipeline.head;
while(head != null) {
System.err.println(">>>>>> handler: " + head);
head = head.next;
}
pipeline.fireChannelActive();
}
});
}
// 执行成功, 并通知所有listener
safeSetSuccess(promise);
}
1.4.4.1 HeadContext.channelActive
- ServerBootstrapAcceptor和TailContext的channelActive 没有什么操作
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
// 默认auto read
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
1.4.4.2 Channel.read
public Channel read() {
pipeline.read();
return this;
}
public final ChannelPipeline read() {
tail.read();
return this;
}
- 还是找到了HeadContext执行  ̄へ ̄
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new NamedRunnable() {
@Override
public String name() {
return "AbstractChannelHandlerContext[726] > " + next.handler() + ".read(...)";
}
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
- HeadContext.read
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
- Unsafe.beginRead()
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
- 添加NioServerSocketChannel创建时候的readInterestOp
- readInterestOp = SelectionKey.OP_ACCEPT
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
一些日志