4. Netty解析:Handler、Pipeline大动脉及其在注册过程中体现

前言

   终于到了这个令人激动的部分,也是我们平时利用netty开发时最关注的地方。前面讲到了NioEventLoopGroup、NioEventLoop。在上一篇中讲到了NioServerSocketChannel和NioSocketChannel的创建及注册,我们最终分析到在initAndRegister()方法中,完成了netty中通道的创建、初始化(向其添加必要的处理器)以及向NioEventLoopGroup的注册,最终是把NioSocketChannel或者NioServerSocketChannel中封装的NIO中的Channel向某一个NioEventLoop关联的selector注册(最终的这个过程发生在AbstractChannel的doRegister()方法)。但是在doRegister()方法之后还调用了pipeline.fireChannelRegistered(),以及initAndRegister()执行后还涉及到一些Pipeline所做的工作,所以需要首先搞明白pipeline是什么以及各种handler和pipeline的关系。

Handler

   netty中的处理器是利用netty进行开发时开发人员关注的核心,分为两类,一类是入站处理器(例如接收到IO事件后进行处理),一类是出站处理器(向外发送数据时进行的处理)。在Echo Server这个demo中,就出现了几个Handler,例如EchoClientHandlerEchoServerHandler,以及一个特殊的入站处理器ChannelInitializer
入站处理器
   入站处理器一般可以通过直接继承ChannelInboundHandlerAdapter,必要时可重写某些回调方法,可重写的方法主要有以下几个:

handlerAdded: 当一个处理器添加到pipeline后,会调用此回调方法
handlerRemoved: 当处理器被移除出pipeline后....
channelRegistered:当一个通道注册到NioEventLoop的selector时,回调pipeline上所有channelRegistered方法
channelUnregistered:通道取消注册,接触绑定.....
channelActive: 通道激活成功....
channelInactive:
channelRead: 有数据可读的时候
channelReadComplete: 可读数据完成入站流水线操作后,会回调此方法。

handler相关的回调方法(handlerAdded和handlerRemoved)只会回调对应的方法,而通道相关的会在流水线handler上逐个进行回调。

出站处理器
   出站处理器一般可以通过直接继承ChannelOutboundHandlerAdapter,必要时可重写某些回调方法,可重写的方法主要有以下几个:bind(用于服务端)、connect(用于客户端)、close、read、write等。

Pipeline与责任链模式

   netty的通道都封装着一个Pipeline实例,也就是每个通道都有一个pipeline(默认就是DefaultChannelPipeline),每个pipeline中都有一个元素为AbstractChannelHandlerContext的双向链表,头尾节点分别由headtail指向。pipeline的链表中存放着一个个Handler,只不过这些handler都被封装为了DefaultChannelHandlerContext而存在于链表中,而链表的首位节点分别为HeadContextTailContext类型。HeadContextTailContext相对比较简单,不做赘述,主要通过DefaultChannelHandlerContext以及父类AbstractChannelHandlerContext的构造器看一下这个所谓的ChannelHandlerContext处理器上下文都有哪些东西。

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

  每一个处理器上下文都封装了一个Handler,记录下它所属的Pipeline以及handler是入站还是出站处理器。

  Pipeline与ChannelHandlerContext是责任链模式的两个重要组件,pipeline为执行链的管理角色,而ChannelHandlerContext为执行链节点。对于入站事件,只会从前向后执行pipeline中的入站处理器的相关回调方法,出站事件则从后向前执行出站处理器相关回调方法。对于入站来说,调用pipeline.fireXXXX方法(例如pipeline.fireChannelRegistered方法)会从执行链头部开始,在流水线的入站处理器上进行传导,调用ChannelHandlerContext.fireXXXX则会从当前节点找到下一个入站处理器,并回调它的XXXX方法。出站事件也是类似,只不过调用方法为例如pipeline.connect和ChannelHandlerContext.connect等。

ChannelInitializer及Pipeline中处理器的添加

   还有印象的话,在最初讲Echo Server这个demo的启动代码时提到过:

bootStrap.handler(xx) : 将xx处理器赋值给bootStrap的handler字段
bootStrap.childHandler(yy) : 将yy处理器赋值给BootStrap的childHandler字段

这时Handler并没有和pipeline关联起来。但是在前文讲述通道的创建注册initAndRegister方法中的init(channel)方法时会负责将handler添加到pipeline。

    // client端(BootStrap)在connect的时候
    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(handler());

       /*省略代码*/
    }

    // server端在执行bind的时候
    void init(Channel channel) throws Exception {
        /*省略代码*/

        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        /*省略代码*/

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                // handler()方法获取到父类AbstractBootStrap中管理的Handler,也就是父循环组对应的handler
                ChannelHandler handler = handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

   client端是直接将配置的handler添加到pipeline(在EchoServer这个例子中handler就是HandlerInitializer),而在server端,首先向pipeline添加了一个HandlerInitializer随后通过这个初始化器添加handler以及childHandler(封装在ServerBootstrapAcceptor中)。这里如有不明白,可继续向下看,后文会再次提到。
  HandlerInitializer是一个特殊的入站处理器,它往往是作为pipeline中第一个处理器。initChannel方法一般主要负责将其他的handler添加到pipeline中,然后自己从pipeline中删除自己。在init(channel)方法中,我们向pipeline中添加了(addLast)handler。addLast的主要逻辑如下:

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);
            // 将handler封装为AbstractChannelHandlerContext 添加到pipeline双向列表的尾部
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        //方法中会回调所添加的handler的handlerAdded回调方法
        callHandlerAdded0(newCtx);
        return this;
    }

当某个handler通过addLast(不只是addLast,还有addFirst等方法)添加到Pipeline后,会将handler封装为DefaultChannelHandlerContext,添加到链表对应位置(对于addLast当然是添加到末尾),(如果通道已经注册过了)则回调该handler的handlerAdded回调方法。如果向pipeline中添加的是HandlerInitializer,那么会将其添加到pipeline中,但是如果pipeline对应的通道还没有注册,则会等到注册完成后,再调用HandlerInitializer的handlerAdded方法(见下方代码)。

    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();  // 完成向selector的注册
            neverRegistered = false;
            registered = true;
    
            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            pipeline.invokeHandlerAddedIfNeeded();
            /*其他代码*/
    }

pipeline.invokeHandlerAddedIfNeeded()会不断地回调handlerAdded方法,此时会调用HandlerInitializer的handlerAdded,进而进行下面的处理:调用HandlerInitializer中被重写的initChannel方法(添加其他handler到pipeline),并将自己从pipeline中移除

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                //将HandlerInitializer自己移除
                remove(ctx);
            }
            return true;
        }
        return false;
    }

   在client端,handler向通道的pipeline中添加,大致就是上面所述:在init(channel)方法调用时,通过pipeline的addLast方法将ChannelInitializer这个入站处理器加入到通道的pipeline中,在addLast方法中,得知当前通道还未注册的话是不会执行该处理器的handlerAdded方法的,等到通道完成注册(体现在doRegister()方法中)后,会调用通道的pipeline中处理器的handlerAdded回调方法,此时就会调用ChannelInitializer的handlerAdded方法,进而完成重写的initChannel方法将后续的处理器添加进入通道后,ChannelInitializer将自己从流水线移除。Handler添加的过程(以Echo Server这个demo的client端为例)大致如图所示:


client端handler的添加

   server端稍微复杂一点点,server端的init方法首先addLast了一个ChannelInitializer,在通道注册完成后,调用到其initChannel方法将父处理器(即AbstracBootStrap管理的handler,也即ServerBootStrap.handler(xxx)配置的处理器)添加到通道,并向对应的NioEventLoop添加一个任务,此任务执行后会将封装了childHandler的ServerBootstrapAcceptor添加到流水线并回调它的handlerAdded,但由它并没有实现handlerAdded,所以就没有后续的操作。以Echo Server这个demo的server端为例,server端handler的添加过程大致如下图。(由于现在还没有接收新的连接,所以这里server端的handler是指的NioServerSocketChannel的流水线中的handler)


server端handler的添加

pipeline流水线的工作流程

   我们在上文提到了pipeline中有两类处理器,入站和出站(还可以说是三类,即既是入站又是出站的处理器),入站处理器有handlerAddedchannelRegisteredchannelRead等回调方法,出站处理器有connectclosereadwrite等回调方法。可以通过两种方式使得处理逻辑在流水线上工作,(以入站为例)调用pipeline的fireXXX方法从头开始进行流水线上入站消息的XXX方法的执行,而调用ChannelHandlerContext的fireXXX方法会从当前节点之后的下一个入站节点开始执行XXX方法。
   之前我们讲到了通道的注册,而且本文开头还提到了,注册完成后还会发生一些事情,那么我们就看一看注册后究竟又做了什么以及pipeline在这里面发挥的作用。
   通道的注册发生在doRegister()方法中,看下方代码,doRegister()完成后,调用了

    private void register0(ChannelPromise promise) {
        try {
           /*忽略*/
            boolean firstRegistration = neverRegistered;
            // 完成了通道的注册
            doRegister();
            neverRegistered = false;
            registered = true;
    
            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            pipeline.invokeHandlerAddedIfNeeded();
    
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

   当注册完成后,调用了pipeline.invokeHandlerAddedIfNeeded()触发了handlerAdded的回调,这个上文我们提到过了。随后调用了safeSetSuccess(promise)设置异步执行结果的执行状态,随后调用了pipeline.fireChannelRegistered(),从流水线头部开始回调节点的channelRegistered()方法。

    //DefaultChannelPipeline类中的方法
    @Override
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }


    // AbstractChannelHandlerContext类中的方法
    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

   可以看到fireChannelRegistered方法从头结点开始,执行了invokeChannelRegistered方法(pipeline中的头、尾结点是特殊的节点,头尾节点之间就是我们所添加的那些handler了)。之后正常情况下就回调入站handler的channelRegister方法(一开始就是进入到流水线头部节点),进入到执行链或者说流水线的某个节点上执行,而如果想要让处理逻辑继续沿着流水线向下执行,那么需要在节点处理器的channelRegistered方法适当位置再去调用ChannelHandlerContext.fireChannelRegistered方法得以让流水线继续。

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }

   当调用了ChannelHandlerContext.fireChannelRegistered后,我们会发现又出现了下面的似曾相识的代码,findContextInbound方法目的是找到下一个入站节点ChannelHandlerContext。自此流水线便可以运行。

    @Override
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound());
        return this;
    }

总结

1. 每个netty通道都会有一个流水线pipeline,pipeline底层就是一个双向列表,具有两个特殊的节点,头节点和尾节点。头尾之间是我们为流水线添加的handler,handler的添加往往借助于ChannelInitializer这个特殊的入站处理器。
2 流水线的运转是责任链模式的体现,本文以通道注册后的执行逻辑为例,分析了pipeline的入站是怎么工作的,不只是channelRegistered回调方法,其实与通道相关的每一个处理器的回调方法的执行方式都是类似的,只是执行的时机不同罢了。
  

*链接

1. Netty解析:第一个demo——Echo Server
2. Netty解析:NioEventLoopGroup事件循环组
3. Netty解析:NioSocketChannel、NioServerSocketChannel的创建及注册
4. Netty解析:Handler、Pipeline大动脉及其在注册过程中体现
5. Netty解析:connect/bind方法背后
6. Netty解析:服务端如何接受连接并后续处理读写事件

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容