An essay is written with words just as a house is built with bricks, but merely a collection of words cannot be called an essay any more than a pile of bricks can be called a house.
Netty客户端与服务端通信
本章来了解下如何通过netty让客户端与服务端进行通信。
服务器端的代码构建与客户端代码构建过程基本类似,可以参见Netty构建服务的基本步骤文章来了解构建过程以及具体说明。
服务器端代码实现:
- Main方法启动服务类
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workderGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workderGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workderGroup.shutdownGracefully();
}
}
}
- 服务端的MyServerInitializer,初始化处理器链
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
// 往pipeline上添加解码器
channelPipeline.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
// 添加个编码器
channelPipeline.addLast("lengthFieldPrepender", new LengthFieldPrepender(4));
channelPipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
channelPipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
// 添加自己实现的处理器
channelPipeline.addLast(new MyServerHandler());
}
}
- 服务器端的自定义处理器MyServerHandler
public class MyServerHandler extends SimpleChannelInboundHandler<String> {
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("remote address: " + ctx.channel().remoteAddress() + ", from client msg-->" + msg);
ctx.channel().writeAndFlush(msg); // 如果内容比较多,可以多次write,然后最后调用一次flush.
}
// 当出现异常时的方法重写
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close(); // 将Channel处理上下文关闭
}
}
以上服务端代码已完成。
客户端代码实现:
- 客户端服务启动类
public class MyClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
- 客户端服务MyClientInitializer,初始化客户端处理器链
基本跟服务端所添加的处理器是一样的。
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
// 往pipeline上添加解码器
channelPipeline.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
// 添加个编码器
channelPipeline.addLast("lengthFieldPrepender", new LengthFieldPrepender(4));
channelPipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
channelPipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
// 添加自己实现的处理器
channelPipeline.addLast(new MyClientHandler());
}
}
- 客户端自定义处理器实现,将来自客户端请求加上时间发送给服务端。
public class MyClientHandler extends SimpleChannelInboundHandler<String>{
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("server remote address: " + ctx.channel().remoteAddress());
System.out.println("server msg-->" + msg);
ctx.writeAndFlush("from client: " + LocalDateTime.now()); // jdk8时间API
}
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("xyz");
ctx.channel().writeAndFlush("xyz");
super.channelActive(ctx);
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
最后,我们来启动服务端和客户端,将会看到如下效果:
当有消息连接上服务端后,我们也可以通过lsof -i:8899查看这个端口被占用的进程