前言
终于到了这个令人激动的部分,也是我们平时利用netty开发时最关注的地方。前面讲到了NioEventLoopGroup、NioEventLoop。在上一篇中讲到了NioServerSocketChannel和NioSocketChannel的创建及注册,我们最终分析到在initAndRegister()方法中,完成了netty中通道的创建、初始化(向其添加必要的处理器)以及向NioEventLoopGroup的注册,最终是把NioSocketChannel或者NioServerSocketChannel中封装的NIO中的Channel向某一个NioEventLoop关联的selector注册(最终的这个过程发生在AbstractChannel的doRegister()方法)。但是在doRegister()方法之后还调用了pipeline.fireChannelRegistered(),以及initAndRegister()执行后还涉及到一些Pipeline所做的工作,所以需要首先搞明白pipeline是什么以及各种handler和pipeline的关系。
Handler
netty中的处理器是利用netty进行开发时开发人员关注的核心,分为两类,一类是入站处理器(例如接收到IO事件后进行处理),一类是出站处理器(向外发送数据时进行的处理)。在Echo Server这个demo中,就出现了几个Handler,例如EchoClientHandler
、EchoServerHandler
,以及一个特殊的入站处理器ChannelInitializer
。
入站处理器
入站处理器一般可以通过直接继承ChannelInboundHandlerAdapter
,必要时可重写某些回调方法,可重写的方法主要有以下几个:
handlerAdded: 当一个处理器添加到pipeline后,会调用此回调方法
handlerRemoved: 当处理器被移除出pipeline后....
channelRegistered:当一个通道注册到NioEventLoop的selector时,回调pipeline上所有channelRegistered方法
channelUnregistered:通道取消注册,接触绑定.....
channelActive: 通道激活成功....
channelInactive:
channelRead: 有数据可读的时候
channelReadComplete: 可读数据完成入站流水线操作后,会回调此方法。
handler相关的回调方法(handlerAdded和handlerRemoved)只会回调对应的方法,而通道相关的会在流水线handler上逐个进行回调。
出站处理器
出站处理器一般可以通过直接继承ChannelOutboundHandlerAdapter
,必要时可重写某些回调方法,可重写的方法主要有以下几个:bind(用于服务端)、connect(用于客户端)、close、read、write等。
Pipeline与责任链模式
netty的通道都封装着一个Pipeline实例,也就是每个通道都有一个pipeline(默认就是DefaultChannelPipeline
),每个pipeline中都有一个元素为AbstractChannelHandlerContext
的双向链表,头尾节点分别由head
和tail
指向。pipeline的链表中存放着一个个Handler,只不过这些handler都被封装为了DefaultChannelHandlerContext
而存在于链表中,而链表的首位节点分别为HeadContext
和TailContext
类型。HeadContext
和TailContext
相对比较简单,不做赘述,主要通过DefaultChannelHandlerContext
以及父类AbstractChannelHandlerContext
的构造器看一下这个所谓的ChannelHandlerContext处理器上下文都有哪些东西。
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
每一个处理器上下文都封装了一个Handler,记录下它所属的Pipeline以及handler是入站还是出站处理器。
Pipeline与ChannelHandlerContext是责任链模式的两个重要组件,pipeline为执行链的管理角色,而ChannelHandlerContext为执行链节点。对于入站事件,只会从前向后执行pipeline中的入站处理器的相关回调方法,出站事件则从后向前执行出站处理器相关回调方法。对于入站来说,调用pipeline.fireXXXX方法(例如pipeline.fireChannelRegistered方法)会从执行链头部开始,在流水线的入站处理器上进行传导,调用ChannelHandlerContext.fireXXXX则会从当前节点找到下一个入站处理器,并回调它的XXXX方法。出站事件也是类似,只不过调用方法为例如pipeline.connect和ChannelHandlerContext.connect等。
ChannelInitializer及Pipeline中处理器的添加
还有印象的话,在最初讲Echo Server这个demo的启动代码时提到过:
bootStrap.handler(xx) : 将xx处理器赋值给bootStrap的handler字段
bootStrap.childHandler(yy) : 将yy处理器赋值给BootStrap的childHandler字段
这时Handler并没有和pipeline关联起来。但是在前文讲述通道的创建注册initAndRegister方法中的init(channel)方法时会负责将handler添加到pipeline。
// client端(BootStrap)在connect的时候
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());
/*省略代码*/
}
// server端在执行bind的时候
void init(Channel channel) throws Exception {
/*省略代码*/
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
/*省略代码*/
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// handler()方法获取到父类AbstractBootStrap中管理的Handler,也就是父循环组对应的handler
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
client端是直接将配置的handler添加到pipeline(在EchoServer这个例子中handler就是HandlerInitializer),而在server端,首先向pipeline添加了一个HandlerInitializer随后通过这个初始化器添加handler以及childHandler(封装在ServerBootstrapAcceptor中)。这里如有不明白,可继续向下看,后文会再次提到。
HandlerInitializer
是一个特殊的入站处理器,它往往是作为pipeline中第一个处理器。initChannel
方法一般主要负责将其他的handler添加到pipeline中,然后自己从pipeline中删除自己。在init(channel)方法中,我们向pipeline中添加了(addLast)handler。addLast的主要逻辑如下:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
// 将handler封装为AbstractChannelHandlerContext 添加到pipeline双向列表的尾部
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//方法中会回调所添加的handler的handlerAdded回调方法
callHandlerAdded0(newCtx);
return this;
}
当某个handler通过addLast(不只是addLast,还有addFirst等方法)添加到Pipeline后,会将handler封装为DefaultChannelHandlerContext,添加到链表对应位置(对于addLast当然是添加到末尾),(如果通道已经注册过了)则回调该handler的handlerAdded回调方法。如果向pipeline中添加的是HandlerInitializer,那么会将其添加到pipeline中,但是如果pipeline对应的通道还没有注册,则会等到注册完成后,再调用HandlerInitializer的handlerAdded方法(见下方代码)。
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); // 完成向selector的注册
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
/*其他代码*/
}
pipeline.invokeHandlerAddedIfNeeded()会不断地回调handlerAdded方法,此时会调用HandlerInitializer的handlerAdded,进而进行下面的处理:调用HandlerInitializer中被重写的initChannel方法(添加其他handler到pipeline),并将自己从pipeline中移除。
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
//将HandlerInitializer自己移除
remove(ctx);
}
return true;
}
return false;
}
在client端,handler向通道的pipeline中添加,大致就是上面所述:在init(channel)方法调用时,通过pipeline的addLast方法将ChannelInitializer这个入站处理器加入到通道的pipeline中,在addLast方法中,得知当前通道还未注册的话是不会执行该处理器的handlerAdded方法的,等到通道完成注册(体现在doRegister()方法中)后,会调用通道的pipeline中处理器的handlerAdded回调方法,此时就会调用ChannelInitializer的handlerAdded方法,进而完成重写的initChannel方法将后续的处理器添加进入通道后,ChannelInitializer将自己从流水线移除。Handler添加的过程(以Echo Server这个demo的client端为例)大致如图所示:
server端稍微复杂一点点,server端的init方法首先addLast了一个ChannelInitializer,在通道注册完成后,调用到其initChannel方法将父处理器(即AbstracBootStrap管理的handler,也即ServerBootStrap.handler(xxx)配置的处理器)添加到通道,并向对应的NioEventLoop添加一个任务,此任务执行后会将封装了childHandler的ServerBootstrapAcceptor添加到流水线并回调它的handlerAdded,但由它并没有实现handlerAdded,所以就没有后续的操作。以Echo Server这个demo的server端为例,server端handler的添加过程大致如下图。(由于现在还没有接收新的连接,所以这里server端的handler是指的NioServerSocketChannel的流水线中的handler)
pipeline流水线的工作流程
我们在上文提到了pipeline中有两类处理器,入站和出站(还可以说是三类,即既是入站又是出站的处理器),入站处理器有handlerAdded
、channelRegistered
、channelRead
等回调方法,出站处理器有connect
、close
、read
、write
等回调方法。可以通过两种方式使得处理逻辑在流水线上工作,(以入站为例)调用pipeline的fireXXX方法从头开始进行流水线上入站消息的XXX方法的执行,而调用ChannelHandlerContext的fireXXX方法会从当前节点之后的下一个入站节点开始执行XXX方法。
之前我们讲到了通道的注册,而且本文开头还提到了,注册完成后还会发生一些事情,那么我们就看一看注册后究竟又做了什么以及pipeline在这里面发挥的作用。
通道的注册发生在doRegister()方法中,看下方代码,doRegister()完成后,调用了
private void register0(ChannelPromise promise) {
try {
/*忽略*/
boolean firstRegistration = neverRegistered;
// 完成了通道的注册
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
当注册完成后,调用了pipeline.invokeHandlerAddedIfNeeded()触发了handlerAdded的回调,这个上文我们提到过了。随后调用了safeSetSuccess(promise)设置异步执行结果的执行状态,随后调用了pipeline.fireChannelRegistered(),从流水线头部开始回调节点的channelRegistered()
方法。
//DefaultChannelPipeline类中的方法
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
// AbstractChannelHandlerContext类中的方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
可以看到fireChannelRegistered
方法从头结点开始,执行了invokeChannelRegistered
方法(pipeline中的头、尾结点是特殊的节点,头尾节点之间就是我们所添加的那些handler了)。之后正常情况下就回调入站handler的channelRegister方法(一开始就是进入到流水线头部节点),进入到执行链或者说流水线的某个节点上执行,而如果想要让处理逻辑继续沿着流水线向下执行,那么需要在节点处理器的channelRegistered方法适当位置再去调用ChannelHandlerContext.fireChannelRegistered方法得以让流水线继续。
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
当调用了ChannelHandlerContext.fireChannelRegistered后,我们会发现又出现了下面的似曾相识的代码,findContextInbound方法目的是找到下一个入站节点ChannelHandlerContext。自此流水线便可以运行。
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
总结
1. 每个netty通道都会有一个流水线pipeline,pipeline底层就是一个双向列表,具有两个特殊的节点,头节点和尾节点。头尾之间是我们为流水线添加的handler,handler的添加往往借助于ChannelInitializer这个特殊的入站处理器。
2 流水线的运转是责任链模式的体现,本文以通道注册后的执行逻辑为例,分析了pipeline的入站是怎么工作的,不只是channelRegistered回调方法,其实与通道相关的每一个处理器的回调方法的执行方式都是类似的,只是执行的时机不同罢了。
*链接
1. Netty解析:第一个demo——Echo Server
2. Netty解析:NioEventLoopGroup事件循环组
3. Netty解析:NioSocketChannel、NioServerSocketChannel的创建及注册
4. Netty解析:Handler、Pipeline大动脉及其在注册过程中体现
5. Netty解析:connect/bind方法背后
6. Netty解析:服务端如何接受连接并后续处理读写事件