Netty心跳检测代码实例及源码分析

背景:今天在研读项目netty相关代码时,发现有设备有心跳机制(尽管在本项目中没啥左右),本着要不试一下的方式,调用下Netty提供的IdleStatHandler这个handler来实现一下心跳检测功能。

  • 尝试:
  1. 在网上搜索了一下netty的心跳检测api,光看到IdleStatHandler就直接下手写代码了,想着也就一套调用链的方式,写完测一下没问题就ok了,便写下了如下代码:

    Netty服务端代码:

    public class MyServer {
        public static void main(String[] args) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO)) 
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline channelPipeline = ch.pipeline();
                                channelPipeline.addLast(new HeartBeatHandler(3, 0, 0));
                                channelPipeline.addLast(new MyServerHandler());
                     
                            }
                        });
    
    
                ChannelFuture channelFuture = serverBootstrap.bind(10005).sync();
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    ------------------------------------------------------------------------
    public class HeartBeatHandler  extends IdleStateHandler {
        public HeartBeatHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
            super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);
        }
        @Override
        public void read(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HeartBeatHandler----->"+ctx);
            super.read(ctx);
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            System.out.println("HeartBeatHandler 中的userEventTriggered被触发");
            //空闲状态转换
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                String evenType = null;
    
                switch (idleStateEvent.state()) {
                    case READER_IDLE:
                        evenType = "读空闲";
                        break;
                    case WRITER_IDLE:
                        evenType = "写空闲";
                        break;
                    case ALL_IDLE:
                        evenType = "读写空闲";
                        break;
                }
                System.out.println(ctx.channel().remoteAddress() + "超时事件:" + evenType);
            }
        }
    }
    ---------------------------
    public class MyServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(msg);
            super.channelRead(ctx, msg);
        }
    }
    

    Socket测试代码:

    public class Test {
        public static void socketTest() throws IOException, InterruptedException {
            Socket socket=new Socket("127.0.0.1",10005);
            PrintWriter pw = new PrintWriter(socket.getOutputStream());
            for (int i=0;i<100;i++){
                pw.println("HelloWorld");
                pw.flush();
                TimeUnit.SECONDS.sleep(5);
            }
            pw.close();
            socket.close();
        }
        public static void main(String[] args) throws IOException, InterruptedException {
            socketTest();
        }
    }
    
    1. 开始自信的运行代码,结果发现光顾着输出helloworld相关的内容了(为什么不直接是helloWorld,因为这里没有做编解码操作,这不是本文讨论重点)

    2. 尝试百度,stackoverflow,也没能查到原由,也没能看到示例代码,基本给的解决方案都是指将IdlestatHandler调用链放置在第一位置(我本来就这样放的ORZ),顺便吐槽一下csdn : )

  • 一探究竟

    心有不死,虽不是项目必须实现功能,但是勾起了好奇心,这不得探个究竟怎么睡得着。

    1. Debug 调试:

      跟着断点一步一步进入调用方法链(这里只列出核心代码):

             //当数据链上的handler中的channelRead方法被调用时,reading 标志位-->true
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
                  reading = true;
                  firstReaderIdleEvent = firstAllIdleEvent = true;
              }
              ctx.fireChannelRead(msg);
          }
             //根据构造函数中的三个参数设定时间 readerIdleTimeSeconds writerIdleTimeSeconds allIdleTimeSeconds 
             //IdleStatHandler会启动对应的检测线程,这里以读超时距离,线程通过判断是否已读,以及是否超时组合判断是否需要调用userEventTriggered()函数,这里只提供超时检测线程代码,其他生成相关检测池代码可自行断点调试.
              protected void run(ChannelHandlerContext ctx) {
                  long nextDelay = allIdleTimeNanos;
                  if (!reading) {
                      nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
                  }
                  if (nextDelay <= 0) {
                      allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
                      boolean first = firstAllIdleEvent;
                      firstAllIdleEvent = false;
                      try {
                          //进入触发调用流程
                          channelIdle(ctx, newIdleStateEvent(IdleState.ALL_IDLE, first));
                      } catch (Throwable t) {
                          ctx.fireExceptionCaught(t);
                      }
                  } 
              }
         
             //以下三段代码为触发调用流程中的代码:
          protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
              ctx.fireUserEventTriggered(evt);
          }
      
          @Override
          public ChannelHandlerContext fireUserEventTriggered(final Object event) {
              invokeUserEventTriggered(findContextInbound(), event);
              return this;
          }
             //把这里读懂就明白了为什么上面的示例代码无法正常触发userEventTriggered()函数了
          private AbstractChannelHandlerContext findContextInbound() {
              AbstractChannelHandlerContext ctx = this;
             //此处的do方法无论条件如何都会先进行一次向后传递,变成next值
             //因此示例代码中的HeartBeatHandler()虽然存在userEventTriggered(),但是在这个函数中,找的是下一个Handler的ChannelHandlerContext,那可以猜想一下,如果此时MyServerHandler()复写了userEventTriggered(),会被触发吗?
              do {
                  ctx = ctx.next;
              } while (!ctx.inbound);
              return ctx;
          }
      
      
             //注意在上面的fireUserEventTriggered()函数中,最外层函数是本函数,稍微读一下,也能看出来,这是一个递归函数不断的进行链式递归,直到满足上面的 ctx.inbound=true 即handler处理链中的Inbound已经被调用完毕(如果存在userEventTriggered()的话),文章最后会提供channelPipeline的Handler调用图。
          static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
              ObjectUtil.checkNotNull(event, "event");
              EventExecutor executor = next.executor();
              if (executor.inEventLoop()) {
                  next.invokeUserEventTriggered(event);
              } else {
                  executor.execute(new Runnable() {
                      @Override
                      public void run() {
                          next.invokeUserEventTriggered(event);
                      }
                  });
              }
          }
      
      
    2. debug结束,思路大概理清了,Idlestathandler通过新开线程来进行耗时检测,通过耗时配合表示位,来决定是否调用userEventTriggered()函数,并且在findContextInbound由于使用的是do while循环,所以是不会出现调用自己本身的情况,采用这样的编写方式我想不仅仅是不调用自身的触发函数,而是在Inbound找寻到最深处时,可以将ctx自动转换为outbound相关的handlerContext.最后使用递归函数不断递归inbound链,进行链式调用,所有该链上的handler的userEventTriggered() 都将被调用(当然,除了第一个,因为 do while的原因 : )

    3. 分析完毕,编写新的调用代码示例。

      //避免代码重复,只提供调用链代码,其他代码不变
      ChannelPipeline channelPipeline = ch.pipeline();
      channelPipeline.addLast(new HeartBeatHandler(5, 0, 0));
      channelPipeline.addLast(new MyServerHandler());
      //偷个懒加个匿名内部类
      channelPipeline.addLast(new ChannelInboundHandlerAdapter(){
         @Override
         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws    Exception {
            System.out.println("last Trigger触发");
            super.userEventTriggered(ctx, evt);
        }
      ---------------
        //注意这里的MyServerHandler复写了触发函数,用来观察是否被触发
      public class MyServerHandler extends ChannelInboundHandlerAdapter {
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              System.out.println(msg);
              super.channelRead(ctx, msg);
          }
          @Override
          public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
              System.out.println("MyServerHandler userEventTriggered 触发");
              super.userEventTriggered(ctx,evt);
          }
      }
        
       //最后,对于后两个调用链的代码,读者可以自行替换位置,尝试运行,观察输出效果,相信一定会对IdlestatHandler触发流程有更深刻的了解。
       
      

ChannelPipeline handler调用图:

*  +---------------------------------------------------+---------------+
*  |                           ChannelPipeline         |               |
*  |                                                  \|/              |
*  |    +---------------------+            +-----------+----------+    |
*  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  .               |
*  |               .                                   .               |
*  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
*  |        [ method call]                       [method call]         |
*  |               .                                   .               |
*  |               .                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  +---------------+-----------------------------------+---------------+
*                  |                                  \|/
*  +---------------+-----------------------------------+---------------+
*  |               |                                   |               |
*  |       [ Socket.read() ]                    [ Socket.write() ]     |
*  |                                                                   |
*  |  Netty Internal I/O Threads (Transport Implementation)            |
*  +-------------------------------------------------------------------+
  • 总结:保持好奇心,别怕失败,不要气馁。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353