Netty学习之内置处理器以及编解码器

Netty学习之内置处理器以及编解码器

前言

SSL/TLS

SSL/TLS是目前广泛使用的加密,位于TCP之上,其他的应用层协议之下,当应用层将数据交给SSL/TLS之后,数据会被进行加密,关于SSL/TLS更多的内容,可以参考:SSL/TLS协议运行机制的概述OpenSSL 与 SSL 数字证书概念贴

javax.net.ssl中提供了原生的SSL/TLS支持,通过SSLContextSSLEngine可以方便地进行数据的加密及解密。

在Netty中,为了方便开发者使用SSL/TLS,Netty提供了SSlHandler(本质是一个ChannelHandler),只要为其配置一个SSLEngine即可进行加密数据传输。

class SslEngineInitializer extends ChannelInitializer<Channel> {

    private final SslContext context;
    private final boolean startTls;

    public SslEngineInitializer(SslContext context, boolean startTls) {
        this.context = context;
        this.startTls = startTls;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        SSLEngine engine = context.newEngine(ch.alloc());
        ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
    }
}

HTTP/HTTPS

一个HTTP请求或者相应可能由多个部分组成,一个完整的Http请求由以下内容组成

  • 一个HttpRequest,表示请求头部
  • 一个或者多个HttpContent表示Http的内容
  • 一个LastHttpContent标志Http内容的结束

由于一个Http请求包含请求部分以及相应部分,而对于客户端及服务端来说,这两者是不相同的,客户端发送请求,服务端接收请求,服务端发送响应,客户端接收响应,所以,需要有不同的处理器来处理不同的内容

常用编解码器

HttpRequestEncoder
HttpResponseEncoder
HttpRequestDecoder
HttpResponseDecoder
class HttpPipelineInitializer extends ChannelInitializer<Channel> {

    private final boolean client;

    public HttpPipelineInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            // 解码响应
            pipeline.addLast("decoder", new HttpResponseEncoder());
            // 编码请求
            pipeline.addLast("encoder", new HttpRequestEncoder());
        }else {
            // 解码请求
            pipeline.addLast("decoder", new HttpRequestDecoder());
            // 编码响应
            pipeline.addLast("encoder", new HttpResponseEncoder());
        }
    }
}

当一个字节流被解码成Http内容之后,就可以操作具体的HttpObject消息了,但是由于一个完整的请求/响应可能会被拆分成几个部分,所以,直接使用其实不是很合适,更好地方式是使用聚合器。

class HttpAggregatorInitializer extends ChannelInitializer<Channel> {

    private final boolean client;

    public HttpAggregatorInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            // 客户端编解码器
            // 等同于上面的两者
            pipeline.addLast("codec", new HttpClientCodec());
        }else {
            // 服务端编解码器
            pipeline.addLast("codec", new HttpServerCodec());
        }
        // 聚合器,允许最大大小为 512 * 1024
        pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
    }
}

聚合之后我们可以直接使用FullHttpRequestFullHttpResponse消息来处理,这两个对象表示的是完整的请求/响应了。

当使用HTTP的时候,如果内容大部分是文本数据,我们一般会使用压缩技术,虽然会增加CPU开销,但是可以有效地节省网络带宽,Netty同样提供了对应的handler并且提供gzip和deflate技术。

class HttpCompressionInitializer extends ChannelInitializer<Channel> {

    private final boolean client;

    public HttpCompressionInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            pipeline.addLast("codec", new HttpClientCodec());
            // 客户端解压
            pipeline.addLast("decompressor", new HttpContentDecompressor());
        }else {
            pipeline.addLast("codec", new HttpServerCodec());
            // 服务端加压
            pipeline.addLast("compressor", new HttpContentCompressor());
        }
    }
}

同时需要注意,如果是JDK6及以前的版本,需要引入JZlib依赖。

<dependency>
    <groupId>com.jcraft</groupId>
    <artifactId>jzlib</artifactId>
    <version>1.1.3</version>
</dependency>

如果我们需要使用HTTPS,只需要将SslHanlder配置在所有handler的最前面即可。

空闲检测及超时

空闲检测及超时,也可以称为心跳检测,目的就是确保连接的另一端依旧在线,如果不在线,则断开连接,节省资源。

Netty中提供了几个常用的handler

  • IdleStateHandler,如果连接空闲时间过长,则触发一个IdleStateEvent,可以通过在ChannelInboundHandler覆盖userEventTriggered()来处理该事件
  • ReadTimeoutHandler,当channel中一段时间没有inbound数据的时候,抛出readTimeoutException并且关闭channel,可以通过exceptionCaught()捕获该异常。
  • WriteTimeoutHandler,当channel中有一段时间没有outbound数据时,抛出writeTimeoutException并且关闭channel,可以通过exceptionCaught()捕获异常。

需要注意的是,IdleStateHandler的作用是用于检测channel在指定时间内是否有数据流通,如果没有的话,则触发一个IdleStateEvent,该Event是用于通知本channel的,而不是用于通知对方,所以,我们可以根据收到的Event来决定处理逻辑,比如

  • 对于服务端:收到超过3个对应的事件,表示超过3 * time时间内没有交互,因此决定断开连接。
  • 对于客户端:收到事件后,发送一个心跳包(内容其实是随意的,主要是由数据流动),表明自己还活着(注意该事件同样是给自己的,不是给对方的,所以都需要增加对应的逻辑)

下面举一个具体的例子

服务端

public class Server {
    public static void main(String[] args) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        bootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new IdleStateHandlerInitializer());
        try {
            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

/**
*  服务端空闲检测
*/
class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(0, 0, 60));
        pipeline.addLast(new HeartbeatHandler());
    }

    /**
    *  服务端的空闲处理逻辑
    */
    private class HeartbeatHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                // 如果超过,则断开连接
                if (event.state() == IdleState.ALL_IDLE) {
                    ctx.writeAndFlush(Unpooled.copiedBuffer("bybe".getBytes()));
                    ctx.close();
                }
            }else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

客户端

class Client {
    public static void main(String[] args) {
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 如果这里改成70,则会断开
                        pipeline.addLast(new IdleStateHandler(0, 0, 50, TimeUnit.SECONDS));
                        pipeline.addLast(new Heartbeat());
                    }
                });
        try {
            ChannelFuture fu = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();
            fu.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }

    /**
    *  客户端空闲检测
    */
    private static class Heartbeat extends ChannelInboundHandlerAdapter {
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                // 发送心跳包
                ctx.writeAndFlush(Unpooled.copiedBuffer("heartbeat".getBytes()));
            }
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf data = (ByteBuf) msg;
            System.out.println(data.toString(CharsetUtil.UTF_8));
        }
    }
}

基于分隔符及长度的协议处理

在某些协议中,是根据换行符或者指定长度来划分的,Netty中提供了基于这两者的处理器

基于分隔符

Netty中主要的基于分隔符的处理器有以下两个

  • DelimeterBasedFrameDecoder,基于指定分隔符
  • LineBasedFrameDecoder,基于换行符(DelimeterBasedFrameDecoder的特例)

基于长度

Netty中基于长度的处理器有以下两个

  • FixedLenghtFrameDecoder,固定长度
  • LengthFieldBasedFrameDecoder,通过构造器指定长度字段的偏移及所占字节数

发送大数据

为了高效地发送大量数据,Netty中提供了FileRegion接口(默认实现DefaultFileRegion),作为支持zero-copy的传输器,用于在channel中发送文件

如果需要将数据从文件系统拷贝到用户空间,可以使用ChunkedWriteHandler,它提供了低消耗内存异步将大数据流写出。

序列化

Netty提供的JDK序列化相关的处理器

  • CompatibleObjectDecoder,适用于非Netty的并且使用JDK的序列化器
  • CompatibleObjectEncoder,同上
  • ObjectDecoder,适用于在JDK序列化器之上使用自定义序列化
  • ObjectEncoder,同上

同时,Netty还提供了基于ProtoBuf的处理器,具体的可以参考文件即可,使用上基本差不多

总结

本小节我们主要学习了Netty所提供的几个常用的handler,包括了SSL/TLS相关的handler、HTTP相关的handler、空闲处理器(心跳)、协议分割处理器以及序列化处理器等,有了这些常用的处理器,可以不用处理具体协议的相关内容,从而可以更专注于逻辑方面的处理。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容

  • Netty的简单介绍 Netty 是一个 NIO client-server(客户端服务器)框架,使用 Netty...
    AI乔治阅读 8,401评论 1 101
  • Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架...
    认真期待阅读 2,780评论 1 27
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 综述 netty通...
    jiangmo阅读 5,855评论 0 13
  • 一晃,年后几个月了,许久没有回家,有点想念爸妈。前不久翻看爸妈的照片,更加想念。 几年前,妈妈的头发已有银色,经常...
    刀笔伐心阅读 154评论 0 5