介绍
protobuf是google的一款跨平台、高效、可扩展的序列化框架。
netty是由JBOSS开源的一款java网络通信框架。netty是异步的、基于事件驱动架构网络通讯框架。目前很多的开源框架中都是用netty来进行通信,如:
- hadoop用netty来实现rpc通信,以传输数据、心跳检测等。
- spark用netty来进行网络通讯(在1.3中,使用的是akka,1.6中使用的是akka与netty并存,2.0以后只存在netty)。
基本上只要是涉及到java网络通讯的,基本上都用到了netty,可见其netty的用处之广。
编解码器
netty中存在很多编解码器,Protobuf编解码器就是其中之一,我们只需要定义自己的处理逻辑即可。
netty使用protobuf
在netty中使用protobuf需要安装以下步骤进行:
- 下载protobuf:windows版本、linux版本,这里只下载windows版本的protobuf。
- windows需要将protobuf src下的protoc.exe添加到环境变量中,将protobuf的sec目录设置到path目录即可,不需要protoc.exe文件名。
- 定义*.protoc文件,如下:
package tutorial;
option java_package = "com.example.tutorial";// java包名
option java_outer_classname = "AddressBookProtos";// java的外部类名
message Person{// 类
required string name = 1;// 类的属性,required为必须
required int32 id = 2;
optional string email = 3;// optional为可选
enum PhoneType{// 枚举
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber{
required string number = 1;
optional PhoneType type = 2[default = HOME];// 默认值
}
optional PhoneNumber phone = 4;
}
- 使用protoc.exe命令生成java类文件:protoc --java_out=../java person.protoc
--java_out: 生成的java类的输出目录,最后定义的protoc文件的目录。
- 创建序列化对象,使用Proptbuf的newBuilder().build()进行构建待序列化的对象,如:
Person person = Person.newBuilder()
.setId(100)// 上面定义的属性
.setName("zhagnsan")// 上面定义的属性
// 定义的属性为另一个对象,也是使用build进行构建
.setPhone(
PhoneNumber.newBuilder()
.setNumber("555-4321")
.setType(PhoneType.HOME)// 上面定义的枚举
.build()
)
.build();
- 创建服务端,并添加protobuf编解码器
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
// 日志级别
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// protobuf的解码器,需要添加带系列化的类的实例
pipeline.addLast(new ProtobufDecoder(AddressBookProtos.Person.getDefaultInstance()));
// protobuf编码器
pipeline.addLast(new ProtobufEncoder());
// 自己的逻辑,处理序列化前后的Handler
pipeline.addLast(new ProtoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(9090).sync();
future.channel().closeFuture().sync();
} finally{
bossGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
Server Handler
netty新版的输入和输出都是继承自ChannelHandlerAdapter,而旧版的输入和输出是分开的,输入ChannelInboundHandler,输出ChannelOutboundHandler,下面使用新版:
public class ProtoServerHandler extends ChannelHandlerAdapter {
// 有数据可以读取时调用的方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
AddressBookProtos.Person person = (AddressBookProtos.Person) msg;
System.out.println("client request: " + person);
}
// 数据读取完成后调用的方法
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
AddressBookProtos.Person person = AddressBookProtos.Person.newBuilder()
.setId(10).setName("lisi")
.setEmail("lisi@qq.com")
.setPhone(AddressBookProtos.Person.PhoneNumber.newBuilder()
.setNumber("8888888")
.setType(AddressBookProtos.Person.PhoneType.WORK)
.build())
.build();
ctx.writeAndFlush(person);
}
}
- 创建netty客户端(其实并不一定需要用netty来创建客户端),和服务器端一样,netty也内置了编解码器,也只需要传入序列化的类的示例
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try{
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addFirst(new LoggingHandler(LogLevel.INFO));
// 解码,添加需要解码的类
pipeline.addLast(new ProtobufDecoder(AddressBookProtos.Person.getDefaultInstance()));
pipeline.addLast(new ProtobufEncoder());
// 自定义的逻辑,自己写的Handler
pipeline.addLast(new ProtoClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 9090);
future.channel().closeFuture().sync();
}finally{
group.shutdownGracefully();
}
Client Handler
public class ProtoClientHandler extends ChannelHandlerAdapter {
// 链接成功后调用(三次握手完成)
public void channelActive(ChannelHandlerContext ctx) throws Exception {
AddressBookProtos.Person person = AddressBookProtos.Person.newBuilder()
.setId(100).setName("zhangsan")
.setEmail("zhangsan@qq.com")
.setPhone(AddressBookProtos.Person.PhoneNumber.newBuilder()
.setNumber("555-4321")
.setType(AddressBookProtos.Person.PhoneType.HOME)
.build())
.build();
ctx.writeAndFlush(person);
}
// 有数据可读时调用
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
AddressBookProtos.Person person = (AddressBookProtos.Person) msg;
System.out.println("server response:" + person);
}
// 数据读取完成后调用
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("read complete...");
super.channelReadComplete(ctx);
}
}
- 最后测试时,必须先启动服务器,在启动客户端,即可看到效果。