Netty入门

一、概念

    Netty是Jboss提供的一个Java开源框架,它是基于NIO的网络框架,封装了NIO底层复杂的实现细节,给我们提供了简单好用的概念来实现编程。有了Netty,我们可以实现自己的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器等等。HTTP服务器之所以称为HTTP服务器,是因为编码解码协议是HTTP协议,如果协议是Redis协议,那它就成了Redis服务器,如果协议是WebSocket,那它就成了WebSocket服务器,等等。 使用Netty我们就可以定制编解码协议,实现自己的特定协议的服务器。

二、实现

    本文不过多介绍关于Netty的概念和和细节,我提供了一个客户端,服务端的例子,每段代码后面有解释。

    首先去下载好Netty所需要的jar包,建立一个项目,放好jar包。

1. 服务端实现

package com.server;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {

public static void main(String[] args) {

int port = 9898;

new TimeServer().bind(port);

}

public void bind(int port) {

/**

* interface EventLoopGroup extends EventExecutorGroup extends

* ScheduledExecutorService extends ExecutorService 配置服务端的 NIO

* 线程池,用于网络事件处理,实质上他们就是 Reactor 线程组 bossGroup 用于服务端接受客户端连接,workerGroup 用于进行

* SocketChannel 网络读写

*/

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

/**

* ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类,用于降低开发难度

*/

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)

.childHandler(new ChildChannelHandler());

/** 服务器启动辅助类配置完成后,调用 bind 方法绑定监听端口,调用 sync 方法同步等待绑定操作完成 */

ChannelFuture f = b.bind(port).sync();

System.out.println(Thread.currentThread().getName() + ",服务器开始监听端口,等待客户端连接.........");

/**

* 下面会进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束

*

*/

f.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

/** 优雅退出,释放线程池资源 */

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel arg0) throws Exception {

arg0.pipeline().addLast(new TimeServerHandler());

}

}

}

    在bind方法中创建了两个NioEventLoopGroup实例。NioEventLoopGroup是个线程组,它包含一组NIO线程,专门用于网络事件的处理,实际上它们就是Reactor线程组。这里创建两个的原因是一个用于服务端接收客户端的连接,另一个用于SocketChannel的网络读写。接下来我们再创建了ServerBootstrap对象,它是Netty用于启动NIO的辅助启动类,目的是降低服务端的开发复杂度。下一行调用了ServerBootstrap的group方法,将两个NIO线程当作入参传递到ServerBootstrap中。接着设置Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。然后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024,最后绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的Handler类,主要用于处理网络I/O事件,例如记录日志,对消息进行编解码等。

    服务端启动辅助类配置完成后,调用它的bind方法绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成。完成之后Netty会返回一个ChannelFuture,它的功能类似于JDK的java.util.concurrent.Future,主要用于异步操作的通知回调。

    还用了f.channel().closeFuture().sync()方法进行阻塞,等待服务器端链路关闭之后main函数才退出。最后用了shutdownGracefully()方法进行优雅地退出,它会释放跟shutdownGracefully相关联的资源。

package com.server;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

/**

* 收到客户端消息,自动触发

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

/**

* 将 msg 转为 Netty 的 ByteBuf 对象,类似 JDK 中的 java.nio.ByteBuffer,不过 ButeBuf 功能更强,更灵活

*/

ByteBuf buf = (ByteBuf) msg;

/**

* readableBytes:获取缓冲区可读字节数,然后创建字节数组 从而避免了像 java.nio.ByteBuffer

* 时,只能盲目的创建特定大小的字节数组,比如 1024

*/

byte[] reg = new byte[buf.readableBytes()];

/**

* readBytes:将缓冲区字节数组复制到新建的 byte 数组中 然后将字节数组转为字符串

*/

buf.readBytes(reg);

String body = new String(reg, "UTF-8");

System.out.println(Thread.currentThread().getName() + ",The server receive  order : " + body);

/**

* 回复消息 copiedBuffer:创建一个新的缓冲区,内容为里面的参数 通过 ChannelHandlerContext 的 write

* 方法将消息异步发送给客户端

*/

String respMsg = "I am Server,消息接收 success!";

ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());

ctx.write(respByteBuf);

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

/**

* flush:将消息发送队列中的消息写入到 SocketChannel 中发送给对方,为了频繁的唤醒 Selector 进行消息发送 Netty 的

* write 方法并不直接将消息写如 SocketChannel 中,调用 write 只是把待发送的消息放到发送缓存数组中,再通过调用 flush

* 方法,将发送缓冲区的消息全部写入到 SocketChannel 中

*/

ctx.flush();

}

@Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        /**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */

        ctx.close();

    }

}

    TimeServerHandler类继承自ChannelInboundHandlerAdapter,它用于对网络事件进行读写操作,通常我们只需关注channelRead和exceptionCaught方法。下面对这两个方法进行说明。

    ByteBuf buf = (ByteBuf) msg做了类型转换,将msg转换为Netty的ByteBuf对象。ByteBuf类似于JDK中java.nio.ByteBuffer对象,不过它提供了更加强大和灵活的功能。通过ByteBuf的readableBytes方法可以获取缓冲区可读的字节数,根据可读字节数创建byte数组,通过ByteBuf的readBytes方法将缓冲区中的字节数组复制到新建的byte数组中,最后通过new String构造函数获取请求消息。这是对消息进行判断,如果是"QUERY TIME ORDER"则创建应答消息,通过ChanelHandlerContext的write方法异步发送应答消息给客户端。

    在channelReadComplete()方法中调用了ChanelHandlerContext的flush方法,它的作用是将消息发送队列中的消息写入到SocketChannel中发送给对方。从性能角度考虑,为了防止频繁地唤醒Selector进行消息发送,Netty的write方法并不直接将消息写入SocketChannel中,调用write方法只是把待发送的消息放到缓冲数组中,再通过调用flush方法,将发送缓冲区中的消息全部写入到SocketChannel中。

    在exceptionCaught方法中调用了ctx.close()方法,它的作用是当发生异常时关闭ChannelHandlerContext,释放ChannelHandlerContext相关联的句柄等资源。

2. 客户端实现

package com.client;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {

/**

* 使用 3 个线程模拟三个客户端

*

* @param args

*/

public static void main(String[] args) {

for (int i = 0; i < 3; i++) {

new Thread(new MyThread()).start();

}

}

static class MyThread implements Runnable {

@Override

public void run() {

connect("localhost", 9898);

}

public void connect(String host, int port) {

/** 配置客户端 NIO 线程组/池 */

EventLoopGroup group = new NioEventLoopGroup();

try {

/**

* Bootstrap 与 ServerBootstrap 都继承(extends)于 AbstractBootstrap

* 创建客户端辅助启动类,并对其配置,与服务器稍微不同,这里的 Channel 设置为 NioSocketChannel 然后为其添加

* Handler,这里直接使用匿名内部类,实现 initChannel 方法 作用是当创建 NioSocketChannel

* 成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件

*/

Bootstrap b = new Bootstrap();

b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new TimeClientHandler());

}

});

/** connect:发起异步连接操作,调用同步方法 sync 等待连接成功 */

ChannelFuture channelFuture = b.connect(host, port).sync();

System.out.println(Thread.currentThread().getName() + ",客户端发起异步连接..........");

/** 等待客户端链路关闭 */

channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

/** 优雅退出,释放NIO线程组 */

group.shutdownGracefully();

}

}

}

}

    connect()方法中代码EventLoopGroup group = new NioEventLoopGroup();首先创建客户端处理I/O读写的NioEventLoopGroup线程组,然后继续创建客户端辅助类Bootstrap,随后需要对其进行配置。与服务端不同的是,它的Channel需要设置为NioSocketChannel,然后为其添加handler,此处为了简单直接创建匿名内部类,实现initChannel方法,其作用是当创建NioSocketChannel成功之后,在初始化它的时候将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件。

    客户端启动辅助类设置完成之后,调用connect方法发起异步连接,然后调用同步方法等待连接成功。最后,当客户端连接关闭之后,客户端主函数退出,在退出之前,释放NIO线程组的资源。

package com.client;

import java.util.logging.Logger;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

/**

* 用于对网络事件进行读写操作

*/

private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());

/**

* 当客户端和服务端 TCP 链路建立成功之后,Netty 的 NIO 线程会调用 channelActive 方法

*/

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

String reqMsg = "我是客户端 " + Thread.currentThread().getName();

byte[] reqMsgByte = reqMsg.getBytes("UTF-8");

ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);

/**

* writeBytes:将指定的源数组的数据传输到缓冲区 调用 ChannelHandlerContext 的 writeAndFlush

* 方法将消息发送给服务器

*/

reqByteBuf.writeBytes(reqMsgByte);

ctx.writeAndFlush(reqByteBuf);

}

/**

* 当服务端返回应答消息时,channelRead 方法被调用,从 Netty 的 ByteBuf 中读取并打印应答消息

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf buf = (ByteBuf) msg;

byte[] req = new byte[buf.readableBytes()];

buf.readBytes(req);

String body = new String(req, "UTF-8");

System.out.println(Thread.currentThread().getName() + ",Server return Message:" + body);

ctx.close();

}

/**

    * 当发生异常时,打印异常 日志,释放客户端资源

    */

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        /**释放资源*/

        logger.warning("Unexpected exception from downstream : " + cause.getMessage());

        ctx.close();

    }

}

    这里重点关注三个方法:channelActive, channelRead, 和channelCaught。当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法,发送查询指令给服务端,调用channelHandlerContext的writeAndFlush方法将请求消息发送给服务端。

    当服务端返回应答消息时,channelRead方法被调用,会读取并打印消息。当发生异常时,会在exceptionCaught方法中打印出错信息并关系相关资源。

    下图是客户端、服务端交互信息。


学习自:《Netty权威指南》

                                                                                    2019-09-08

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容