编解码器指的是转换两种不同格式的数据,在网络编程中几乎是必不可少的。比如将String转成ByteBuf,将Student转成String等等。
netty中,编码器处理的是出站消息,因而会继承ChannelOutboundHandler,解码器处理的是入站消息,因而会继承CHannelnboundHandler。
编码器的抽象类MessageToMessageDecoder<I>,其中I是要对什么类型的消息编码。解码器的抽象类MessageToMessageDecoder<I>,其中I是要对什么类型的消息进行解码。还有一个抽象类MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>,表示既可以对消息进行编码,也可以进行解码。
在应用程序中,自定义的编码器或者解码器一般直接继承上述三者即可。比如netty内置的StringEncoder是将String类型转换为ByteBuf;StringDecoder是将ByteBuf转换为String。
下面用两个例子来演示下编解码器的处理方式
-
编码器的例子
应用程序将往channel写入一个Student对象,我们先定义一个编码器,将Student转换成String,再用netty内置的StringEncoder,将String转换成ByteBuf,再写入channel。
EmbeddedChannel channel = new EmbeddedChannel(
//记录日志
new LoggingHandler(LogLevel.DEBUG),
//字符串编码器,将String编码成ByteBuf
new StringEncoder(StandardCharsets.UTF_8),
//Student编码器,将Student转成String
new MessageToMessageEncoder<Student>() {
@Override
protected void encode(ChannelHandlerContext ctx, Student msg, List<Object> out) throws Exception {
out.add(new Gson().toJson(msg));
}
}
);
Student student = new Student(10, "zhangsan");
channel.writeOutbound(student);
-
解码器的例子
应用程序从channel接收数据后,先用netty内置的StringDecoder,将ByteBuf转换为String,再用自定义的解码器,将String转换为Student,这样应用程序的其他handler就可以直接对Student对象处理了
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(LogLevel.DEBUG),
new StringDecoder(StandardCharsets.UTF_8),
new MessageToMessageDecoder<String>() {
@Override
protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
out.add(new Gson().fromJson(msg, Student.class));
}
},
new SimpleChannelInboundHandler<Student>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Student msg) throws Exception {
System.out.println("receive new msg " + msg);
}
}
);
Student student = new Student(10, "zhangsan");
String studentStr = new Gson().toJson(student);
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes(studentStr.getBytes(StandardCharsets.UTF_8)));
接下来,我们来看下编码器的源码
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = null;
try {
//判断msg的类型编码器是否支持
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
//类型强转,因为在声明中已经明确指定I类型了
I cast = (I) msg;
try {
//调用具体实现,对消息进行编码,并放在out中
encode(ctx, cast, out);
} finally {
//cast如果是ReferenceType的需要将引用计数-1,因为传递给下一个handler已经不是cast对象了
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
//如果当前msg的类型,编码器不支持对其进行编码,那么直接传递给下一个handler来处理
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
if (out != null) {
try {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
//这里将编码后的对象传递给下一个handler处理
ctx.write(out.getUnsafe(0), promise);
} else if (sizeMinusOne > 0) {
// Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
// See https://github.com/netty/netty/issues/2525
if (promise == ctx.voidPromise()) {
writeVoidPromise(ctx, out);
} else {
writePromiseCombiner(ctx, out, promise);
}
}
} finally {
out.recycle();
}
}
}
}
编码器对out的处理,有个地方不太理解,就是当out元素个数大于1的时候。
下面是解码器的源码
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
int size = out.size();
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
} finally {
out.recycle();
}
}
}
解码器与编码器的逻辑大致相同,先判断要解码的类型当前的解码器是否支持,若支持的话,进行类型的转换。若不支持,直接将msg添加到out结果列表中,最后再对out列表循环,将列表元素传递给下一个handler处理。