要想了解Netty,我们还是需要看Netty的基础使用
这里使用的netty
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
1.客户端
public class NettyClient {
/*IP地址*/
private static final String HOST = "127.0.0.1";
/*端口号*/
private static final int PORT1 = 9091;
public static void main(String[] args) throws Exception {
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
byte[] bab5BBS = BinaryTransferUtils.hex2Bytes("BAB5BB");
Bootstrap b = new Bootstrap();//客户端
ByteBuf buf = Unpooled.copiedBuffer(bab5BBS);
b.group(workGroup)
.channel(NioSocketChannel.class)//客户端 -->NioSocketChannel
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {//handler
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline()
.addLast("delimiter",new DelimiterBasedFrameDecoder( 100000000,false,buf)) //解码分隔符
.addLast("decoder", new ByteArrayDecoder()) //解码器
.addLast("encoder", new StringEncoder()) //编码器
.addLast(new ClientHandler()); //客户端处理器,具体处理细节
}
});
//创建异步连接 可添加多个端口
ChannelFuture cf1 = b.connect(HOST, PORT1).sync();
cf1.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}
//处理器
public class ClientHandler extends SimpleChannelInboundHandler<Object> {
private static final String id= "123";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String send = "{'type':'add','appid':'" + id+ "'}";
byte[] data = send.getBytes();
ByteBuf firstMessage = Unpooled.buffer();
firstMessage.writeBytes(data);
ctx.writeAndFlush(firstMessage);
System.out.println("客户端发送消息:" + send);
}
@Override
public void channelRead0(ChannelHandlerContext ctx,Object msg) throws Exception {
System.out.println("接收到客户端 发送消息:"+msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
2.服务端
public class NettyApplication {
private static final Logger logger = LoggerFactory.getLogger(WmledApplication.class);
private static final int PORT = 9091;
public static void main(String[] args) {
EventLoopGroup boosGroup = new NioEventLoopGroup(); //处理连接
EventLoopGroup workerGroup = new NioEventLoopGroup(100); //处理网络IO
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//option对应nio中 ServerSocketChannel设置的参数
//childOption对应nio中SocketChannel设置的参数
.option(ChannelOption.SO_BACKLOG, 2048) //连接数
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ServerInitializer()) //服务端初始化信息
// 服务器绑定端口监听
ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
logger.info("----netty服务已经启动,端口:" + PORT + "----------");
// 监听服务器关闭监听
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("--- netty服务异常 ---", e);
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
ServerInitializer 服务器初始化类
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel arg0){
//ChannelPipeline 可以理解为消息传送通道 通道一旦建立 持续存在
ChannelPipeline channelPipeline = arg0.pipeline();
//为通道添加功能
ByteBuf buf = Unpooled.copiedBuffer("}".getBytes());//自定义拆包字符,用“}”做拆包
channelPipeline.addLast("delimiter",
new DelimiterBasedFrameDecoder(1024,false,buf));
//字符串解码 编码
channelPipeline.addLast("decoder", new StringDecoder());
channelPipeline.addLast("encoder",new ByteArrayEncoder());
//添加自主逻辑,这里我是用spring管理的这个serverhandler
channelPipeline.addLast(SpringUtil.getBean(ServerHandler.class));
}
}
IO事件处理之ServerHandler处理器
public class ServerHandler extends SimpleChannelInboundHandler<String> {
private static final Logger logger = LoggerFactory.getLogger(ServerHandler.class);
@Autowired
private MessageHandler messageHandler;
@Override
protected void channelRead0(ChannelHandlerContext arg0, String json) {
messageHandler.handle(arg0,json); //通过自定义的handler来处理数据
}
/**
* channel被激活时调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
logger.info("检测到上线 待加入 【{}】", ctx.channel().remoteAddress());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("【{}】机器下线 " +ctx.channel().remoteAddress());
ctx.close();
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
logger.info(cause.getMessage() + "---------" + ctx.toString());
}
上面客户端和服务端分别启动起来就可以相互通信了。
3.拆包粘包问题
熟悉tcp就知道我们的业务数据不一定会按照我们想象的作为某一个整体发送,一个完整的包可能会被拆分为几个包发送,接受的时候可能就会出现获取部分包的情况,所以就会出现半包,粘包的情况。
Netty提供多种处理拆包和粘包的的解码器:
- 消息固定长度,消息达到某一个长度就表示读取到了一个完整的长度
-- netty提供方案:FixedLengthFrameDecoder - 将回车换行符("\n","\t\n")作为消息结束符,比如ftp协议
-- netty提供方案:LineBasedFrameDecoder - 某种特殊的分隔符作为结束标志
-- netty提供方案:DelimiterBasedFrameDecoder - 通过在消息头定义长度字段表示消息的总长度
其他一些编码解码:
比如StringDecoder,StringEncoder,ByteArrayEncoder等,在通道处理过程中会自动做编码处理。
4.序列化
序列化:是把对象的状态信息转化为可存储或传输的形式过程,也就是把对象转化为字节序列的过程称为对象的序列化
反序列化:是序列化的逆向过程,把字节数组反序列化为对象,把字节序
列恢复为对象的过程成为对象的反序列化
目前存在多种序列化的方式,比如:
- Java自带序列化:netty提供ObjectEncoder和ObjectDecoder对java做序列化处理
- xml
- json
- hession序列化框架
- Protobuf序列化
netty提供:ProtobufDecoder,ProtobufEncoder,ProtobufVarint32LengthFieldPrepender(某种字节处理,便于分割每一条,后面这个处理这个半包),ProtobufVarint32FrameDecoder(半包处理),
-........
5.相关协议开发
Netty支持Http,Websocket,UDP,自定义协议开发,具体怎么使用可以从网络上获取
6.Netty行业应用
Netty基于高性能的异步通信,常常会用在 rpc通信,大数据,游戏端等场景中;我们常见的比如Dubbo,RocketMQ,大数据中Avro等都有使用Netty做
节点点的通信。