- Netty 应用实例:群聊系统。
- 要求:①实现多人群聊;②服务器端:可以监测用户上线,离线,并实现消息转发功能;③客户端:通过 channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)。
- GroupChatServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
private int port; //监听端口
public GroupChatServer(int port) {
this.port = port;
}
//编写run方法,处理客户端的请求
public void run() throws Exception {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的业务处理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务器启动...");
ChannelFuture channelFuture = b.bind(port).sync();
//监听关闭
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
- GroupChatServerHandler.java
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//public static List<Channel> channels = new ArrayList<Channel>();
//使用一个hashMap 管理
//public static Map<String, Channel> channels = new HashMap<String,Channel>();
//定义一个channel 组,管理所有的channel
//GlobalEventExecutor.INSTANCE 是全局的事件执行器(单例)
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//handlerAdded 表示连接一旦建立,第一个被执行
//将当前channel 加入到 channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将该客户加入聊天的信息推送给其它在线的客户端
//该方法会将 channelGroup 中所有的channel 都遍历一次,并发送消息,所以我们不需要自己遍历
channelGroup.writeAndFlush("[客户端]:" + channel.remoteAddress() + " 加入聊天:" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
}
//断开连接,将xx客户离开信息推送给当前在线的客户
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]:" + channel.remoteAddress() + " 离开了\n");
System.out.println("channelGroup size:" + channelGroup.size());
}
//表示channel 处于活动状态,提示 xxx 上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了~");
}
//表示channel 处于不活动状态,提示 xxx离线了
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 离线了~");
}
//读取数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//获取到当前channel
Channel channel = ctx.channel();
//这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
channelGroup.forEach(ch -> {
//若不是当前的channel,则转发消息
if (channel != ch) {
ch.writeAndFlush("[客户端]:" + channel.remoteAddress() + " 发送了消息:" + msg + "\n");
} else {//回显自己发送的消息给自己
ch.writeAndFlush("[自己]发送了消息:" + msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭通道
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
//属性
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相关handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//加入自定义的handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress() + "--------");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通过channel 发送到服务器端
channel.writeAndFlush(msg + "\r\n");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
- GroupChatClientHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
- Netty 应用实例:心跳机制检测。
- 要求:当服务器超过 3 秒没有读时,就提示读空闲;当服务器超过 5 秒没有写操作时,就提示写空闲,实现当服务器超过 7 秒没有读或者写操作时,就提示读写空闲。
- MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); //增加一个日志处理器
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个netty 提供 IdleStateHandler
/*
1、IdleStateHandler 是netty 提供的处理空闲状态的处理器
2、long readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包检测是否连接
3、long writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包检测是否连接
4、long allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包检测是否连接
5、文档说明:triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
6、当 IdleStateEvent 触发后,就会传递给管道 的下一个handler去处理
通过调用(触发)下一个handler 的 userEventTriggered,在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
*/
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
//加入一个对空闲检测进一步处理的handler(自定义)
pipeline.addLast(new MyServerHandler());
}
});
//启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
//将 evt 向下转型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
System.out.println("服务器做相应处理...");
//若发生空闲,则关闭通道
//ctx.channel().close();
}
}
}
- Netty 应用实例:Netty 通过 WebSocket 编程实现服务器和客户端长连接。
- 要求:实现基于 WebSocket 的长连接的全双工地交互,通过改变 Http 协议多次请求的约束,实现了长连接,服务器可以发送消息给浏览器。客户端浏览器和服务器端会相互感知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务器会感知。
- hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
//判断当前浏览器是否支持websocket
if (window.WebSocket) {
//go on
socket = new WebSocket("ws://localhost:7000/hello");
//相当于channelReado,ev收到服务器端回送的消息
socket.onmessage = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
};
//相当于连接开启(感知到连接开启)
socket.onopen = function (ev) {
var rt = document.getElementById("responseText");
rt.value = "连接开启了.."
};
//相当于连接关闭(感知到连接关闭)
socket.onclose = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接关闭了.."
}
} else {
alert("当前浏览器不支持websocket...")
}
//发送消息到服务器
function send(message) {
if (!window.socket) { //先判断socket是否创建好
return;
}
if (socket.readyState == WebSocket.OPEN) {
//通过socket 发送消息
socket.send(message)
} else {
alert("连接没有开启...");
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="发生消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class MyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//因为基于http协议,所以要使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//数据以块方式写,需要添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
1、http数据在传输过程中是分段,HttpObjectAggregator,就是可以将多个段聚合
2、这就是为什么当浏览器发送大量数据时,就会发出多次http请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
1、对应websocket,它的数据是以帧(frame) 形式传递
2、WebSocketFrame 有六个实现子类
3、浏览器请求 ws://localhost:7000/xxx:表示请求的uri
4、WebSocketServerProtocolHandler 核心功能是将http协议升级为ws协议(通过一个状态码 101来切换),保持长连接状态
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
//自定义的handler,处理业务逻辑
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
//启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- MyTextWebSocketFrameHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
//这里 TextWebSocketFrame 类型,表示一个文本帧(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器收到消息:" + msg.text());
//回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now() + " " + msg.text()));
}
//当web客户端连接后,handlerAdded 方法被触发执行
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//id 表示唯一的值,LongText 是唯一的,ShortText 不是唯一
System.out.println("handlerAdded 被调用:" + ctx.channel().id().asLongText());
System.out.println("handlerAdded 被调用:" + ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被调用:" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生 " + cause.getMessage());
ctx.close(); //关闭连接
}
}
- 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,所以在发送数据时就需要编码,接收数据时就需要解码。
- codec(编解码器)的组成部分有两个:
decoder
(解码器)和 encoder
(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据。
- Netty 提供的编码器:
StringEncoder
:对字符串数据进行编码;ObjectEncoder
:对Java对象进行编码。
- Netty 提供的解码器:
StringDecoder
:对字符串数据进行解码;ObjectDecoder
:对 Java 对象进行解码。
- Netty 本身自带的
ObjectDecoder
和ObjectEncoder
可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是Java序列化技术,而Java序列化技术本身效率就不高,存在如下问题:①无法跨语言;②序列化后的体积太大,是二进制编码的5倍多;③序列化性能太低。
-
Protobuf(Google Protocol Buffers)
是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化(序列化)。它很适合做数据存储或 RPC 数据交换格式(远程过程调用 remote procedure call)。目前很多公司使用的数据交换技术:http + json tcp + protobuf。
- Protobuf 是以
message
的方式来管理数据的,并且支持跨平台、跨语言。使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto
文件进行描述。
- Protobuf 入门案例:要求:客户端可以随机发送
StudentPoJo / WorkerPoJo
对象到服务器,服务端能接StudentPoJo / WorkerPoJo
对象(需要判断是哪种类型),并显示信息(通过 Protobuf 解码)。
- 导入protobuf依赖:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.13.0</version>
</dependency>
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package = "com.zzw.netty.codec2"; //指定生成到哪个包下
option java_outer_classname = "MyDataInfo"; // 外部类名, 文件名
//protobuf 可以使用 message 管理其他的message
message MyMessage {
//定义一个枚举类型
enum DataType {
StudentType = 0; //在proto3中要求enum的编号从0开始
WorkerType = 1;
}
//用data_type 来标识传的是哪一个枚举类型
DataType data_type = 1;
//表示每次枚举类型最多只能出现其中的一个,节省空间
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student {
int32 id = 1;//Student类的属性
string name = 2;
}
message Worker {
string name = 1;
int32 age = 2;
}
- 下载好protoc编译器,配置好环境变量,然后编译
Student.proto
文件并在当前路径下存放生成的文件MyDataInfo.java
:protoc --java_out=. Student.proto
。
- NettyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
public static void main(String[] args) throws Exception {
//创建BossGroup 和 WorkerGroup
//说明
//1、创建两个线程组 bossGroup 和 workerGroup
//2、bossGroup 只是处理连接请求,真正的客户端业务处理是会交给 workerGroup 完成
//3、两个都是无限循环
//4、bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数:默认值为 cpu核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
//.handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象(匿名对象)
//给 pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入 ProtobufDecoder
//指定对哪种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler()); //给 workerGroup 的 EventLoop 对应的管道设置处理器
}
});
System.out.println(".....服务器 is ready...");
//绑定一个端口并且同步处理,生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//给 cf 注册监听器,监控我们关心的事件
//绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* 说明:自定义一个Handler 需要继承 netty 规定好的某个HandlerAdapter(规范)
*/
//public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
//读取数据实际(这里我们可以读取客户端发送的消息)
/**
* 1、ChannelHandlerContext ctx:上下文对象,含有管道 pipeline,通道channel,地址
* 2、Object msg:客户端发送的数据,默认是 Object
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyDataInfo.MyMessage msg) throws Exception {
//根据dataType来显示不同的信息
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
MyDataInfo.Student student = msg.getStudent();
System.out.println("学生id:" + student.getId() + ",学生姓名:" + student.getName());
} else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("工人的年龄:" + worker.getAge() + ",工人的姓名:" + worker.getName());
} else {
System.out.println("传输的类型不正确!");
}
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush:write + flush
//将数据写入到缓存,并刷新
//一般需要对发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//发生异常时,需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入编码器 ProtobufEncoder()
pipeline.addLast("encoder", new ProtobufEncoder());
//加入自定义的处理器
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("...客户端 is ok...");
//启动客户端去连接服务器端
//关于ChannelFuture 涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//随机地发送 Student 或者 Worker 对象
int num = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if (0 == num) {
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType)
.setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()).build();
} else {
//发送一个Worker 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
.setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
}
ctx.writeAndFlush(myMessage);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
}
//发生异常时,需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- Netty 的主要组件有 Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe 等。
- ChannelHandler 充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现 ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。业务逻辑通常写在一个或者多个 ChannelInboundHandler 中。ChannelOutboundHandler 原理一样,只不过它是用来处理出站数据的。
- ChannelPipeline 提供了 ChannelHandler 链的容器。以客户端应用程序为例,若事件的运动方向是从客户端到服务端,则称这些事件为出站的,即客户端发送给服务端的数据会通过 pipeline 中的一系列 ChannelOutboundHandler,并被这些 Handler 处理,反之则称为入站的。
- 当 Netty 发送或者接受一个消息时,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如 java 对象);若是出站消息,则会被编码成字节。
- Netty 提供一系列实用的编解码器,它们都实现了 ChannelInboundHadnler 或ChannelOutboundHandler 接口。在这些类中,channelRead 方法已经被重写了。以入站为例,对于每个从入站 Channel 读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的 decode() 方法进行解码,并将已经解码的字节转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。
- 由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp 有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理为止。
关于 ByteToMessageDecoder 实例分析
- Netty 应用实例:使用自定义的编码器和解码器来说明 Netty 的 handler 链调用机制,要求客户端发送 long类型的数据给服务器,同时服务端也发送 long 类型的数据给客户端。
- MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
//自定义初始化类
serverBootstrap.childHandler(new MyServerInitializer());
//启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//注意编码解码操作必须在handler操作前
//入站的handler进行解码
pipeline.addLast(new MyByteToLongDecoder());
//出站的handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
//自定义一个handler来处理业务逻辑
pipeline.addLast(new MyServerHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端:" + ctx.channel().remoteAddress() + " 读到的数据为:" + msg);
//给客户端发送一个long类型的数据
ctx.writeAndFlush(98765L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
* decode 会被调用多次,直到确定没有新的元素被添加到 list 集合为止 或者 ByteBuf 没有更多可读的字节为止
* 若 list out不为空,则会将 list 的内容传递给下一个 channelinboundhandler 处理,该处理器的方法也会被调用多次
* @param ctx 上下文对象
* @param in 入站的 ByteBuf
* @param out 将解码后的数据传给下一个 handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder decode 被调用");
if (in.readableBytes() >= 8) {
out.add(in.readLong());
}
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new MyClientInitializer());
System.out.println("...客户端 is ok...");
//启动客户端去连接服务器端
//关于ChannelFuture 涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//注意:顺序不能相反
//加入一个出站的handler,对数据进行编码
pipeline.addLast(new MyLongToByteEncoder());
//加入一个入站的handler,对数据进行解码
pipeline.addLast(new MyByteToLongDecoder());
//加入一个自定义的handler来处理业务
pipeline.addLast(new MyClientHandler());
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encode 被调用");
System.out.println("msg=" + msg);
out.writeLong(msg);
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器的ip:" + ctx.channel().remoteAddress());
System.out.println("收到服务器的消息:" + msg);
}
//发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
//1、"abcdabcdabcdabcd" 是 16个字节
//2、该处理器的前一个handler 是 MyLongToByteEncoder
//3、MyLongToByteEncoder 的父类:MessageToByteEncoder
//4、父类 MessageToByteEncoder
/*
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try { //判断当前msg 是否为处理的类型,若是则处理,否则跳过自定义的 encode 过程
if (this.acceptOutboundMessage(msg)) {
I cast = msg;
buf = this.allocateBuffer(ctx, msg, this.preferDirect);
try {
this.encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(msg);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException var17) {
throw var17;
} catch (Throwable var18) {
throw new EncoderException(var18);
} finally {
if (buf != null) {
buf.release();
}
}
}
5、因此,我们编写 Encoder 是要注意传入的数据类型和处理的数据类型一致
*/
// ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8));
ctx.writeAndFlush(1234567L);
}
}
- 总结:①不论解码器 handler 还是编码器 handler 即接收的消息类型必须与待处理的消息类型必须一致,否则该 handler 不会被执行;②在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果和期望结果可能不一致。
-
ReplayingDecoder
解码器(public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
)扩展了ByteToMessageDecoder
类,使用了这个类,我们就不必调用readableBytes()
方法。其中,参数 S 指定了用户状态管理的类型,Void
值代表不需要状态管理。
- 应用实例:使用
ReplayingDecoder
编写解码器对上一个案例进行简化。
- MyByteToLongDecoder2.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被调用");
//在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
out.add(in.readLong());
}
}
- 虽然使用
ReplayingDecoder
很方便,但它也有一些局限性:
- 并不是所有的 ByteBuf 操作都被支持,若调用了一个不被支持的方法,则会抛出一个
UnsupportedOperationException
。
-
ReplayingDecoder
在某些情况下可能稍慢于ByteToMessageDecoder
,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢。
- 其它的编解码器:
-
LineBasedFrameDecoder
:这个类在 Netty 内部也有使用,它使用行尾控制字符(\n
或者\r\n
)作为分隔符来解析数据。
-
DelimiterBasedFrameDecoder
:使用自定义的特殊字符作为消息的分隔符。
-
HttpObjectDecoder
:一个 HTTP 数据的解码器。
-
LengthFieldBasedFrameDecoder
:通过指定长度来标识整包消息,这样就可以自动地处理黏包和半包消息。
- TCP 是面向连接的,面向流的,提供高可靠性的服务。收发两端(客户端和服务器端)都要有一一成对的 socket。因此,发送端为了将多个发给接收端的包更有效地发给对方,使用了优化方法(
Nagle
算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的。
- 由于 TCP 无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题。
- 对上图的说明:假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:
- 服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包。
- 服务端一次接受到了两个数据包,D1 和 D2 粘合在一起,称之为
TCP 粘包
。
- 服务端分两次读取到了数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,称之为
TCP 拆包
。
- 服务端分两次读取到了数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余部分内容 D1_2 和完整的 D2 包,也称之为
TCP 拆包
。
- Netty应用实例:TCP 粘包和拆包现象实例。
- MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyServerHandler());
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
//将buffer转成字符串
String message = new String(buffer, StandardCharsets.UTF_8);
System.out.println("服务器接收到数据:" + message);
System.out.println("服务器接收到消息量为:" + (++this.count));
//服务器回送数据给客户端,回送一个随机id
ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", StandardCharsets.UTF_8);
ctx.writeAndFlush(responseByteBuf);
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyClientHandler());
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据 hello,server + 编号
for (int i = 0; i < 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello,server + " + i, StandardCharsets.UTF_8);
ctx.writeAndFlush(buffer);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, StandardCharsets.UTF_8);
System.out.println("客户端接收到消息:" + message);
System.out.println("客户端接收消息数量为:" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- TCP 粘包和拆包解决方案:使用自定义协议包+编解码器来解决,关键就是要解决服务器端每次读取数据长度的问题,一旦解决了,就不会出现服务器多读或少读数据的问题,从而避免的 TCP 粘包、拆包。
- Netty 应用实例:解决TCP 粘包、拆包问题。
- 要求:客户端一共发送 5 个
Message
对象,每发送一个Message
对象,服务器端就接收一个 Message,将其解码并回复一个 Message 对象给客户端。
//协议包
public class MessageProtocol {
private int len; //关键
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被调用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder decode 被调用");
//需要将得到二进制字节码转成 MessageProtocol 数据包(对象)
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
//封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageDecoder());//解码器
pipeline.addLast(new MyMessageEncoder());//编码器
pipeline.addLast(new MyServerHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
//处理业务的handler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
//接收到数据,并处理
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println();
System.out.println("服务器接收到信息如下:");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, StandardCharsets.UTF_8));
System.out.println("服务器接收到消息包数量=" + (++this.count));
//回复消息
String responseContent = UUID.randomUUID().toString();
int responseLen = responseContent.getBytes(StandardCharsets.UTF_8).length;
byte[] responseContent2 = responseContent.getBytes(StandardCharsets.UTF_8);
//构建一个协议包
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(responseLen);
messageProtocol.setContent(responseContent2);
ctx.writeAndFlush(messageProtocol);
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageEncoder()); //加入编码器
pipeline.addLast(new MyMessageDecoder()); //加入解码器
pipeline.addLast(new MyClientHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据 "今天天气冷,吃火锅" 编号
for (int i = 0; i < 5; i++) {
String mes = "今天天气冷,吃火锅!";
byte[] content = mes.getBytes(StandardCharsets.UTF_8);
int length = content.length;
//创建协议包对象
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("客户端接收到消息如下:");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, StandardCharsets.UTF_8));
System.out.println("客户端接收消息数量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常消息:" + cause.getMessage());
ctx.close();
}
}