Netty源码分析-ByteToMessageDecoder

ByteToMessageDecoder

ByteToMessageDecoder是一种ChannelInboundHandler,可以称为解码器,负责将byte字节流住(ByteBuf)转换成一种Message,Message是应用可以自己定义的一种Java对象。

例如应用中使用protobuf协议,则可以将byte转换为Protobuf对象。然后交给后面的Handler来处理。

使用示例, 下面这段代码先将收到的数据按照换行符分割成一段一段的,然后将byte转换成String, 再将String转换成int, 然后把int加一后写回。

image.png

ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    bootstrap.channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.DEBUG))
            .group(bossGroup, workerGroup)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024))
                            .addLast(new ByteToStringDecoder())
                            .addLast(new StringToIntegerDecoder())
                            .addLast(new IntegerToByteEncoder())
                            .addLast(new IntegerIncHandler());
                }
            });
    ChannelFuture bind = bootstrap.bind(8092);
    bind.sync();
    bind.channel().closeFuture().sync();
} finally {
    bossGroup.shutdownGracefully().sync();
    workerGroup.shutdownGracefully().sync();
}

这里的ChannelPipeline的组织结构是
1. ByteToStringDecoder - 将byte转换成String的Decoder
2. StringToIntegerDecoder - String转换成Integer对象的Decoder
3. IntegerToByteEncoder - Integer转换成byte的Encoder
4. IntegerIncHandler - 将接受到的int加一后返回

ByteToStringDecoder

ByteToStringMessageDecoder继承于ByteToMessageDecoder,并实现了ByteToMessageDecoder的
decode(ChannelHandlerContext ctx, ByteBuf in, java.util.List<java.lang.object style="box-sizing: border-box;"> out)方法。</java.lang.object>
decode方法实现中要求将ByteBuf中的数据进行解码然后将解码后的对象增加到list中

public class ByteToStringDecoder extends ByteToMessageDecoder {
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        byte[] data = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(data);
        list.add(new String(data, StandardCharsets.UTF_8));
    }
}

ByteToStringMessageDecoder继承于ByteToMessageDecoder,并实现了ByteToMessageDecoder的
decode(ChannelHandlerContext ctx, ByteBuf in, java.util.List<java.lang.Object> out)方法。
decode方法实现中要求将ByteBuf中的数据进行解码然后将解码后的对象增加到list中

ByteToMessageDecoder

ByteToMessageDecoder继承了ChannelInboundHandlerAdapter所以是一个处理Inbound事件的Handler。
其内部保存一个Cumulator用于保存待解码的ByteBuf,然后不断调用子类需要实现的抽象方法decode去取出byte数据转换处理。

/**
     * Cumulate {@link ByteBuf}s.
     */
    public interface Cumulator {
        /**
         * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
         * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
         * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
         */
        ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
    }

Cumulator有两种实现,MERGE_CUMULATOR和COMPOSITE_CMUMULATOR。MERGE_CUMULATOR通过memory copy的方法将in中的数据复制写入到cumulation中。COMPOSITE_CUMULATOR采取的是类似链表的方式,没有进行memory copy, 通过一种CompositeByteBuf来实现,在某些场景下会更适合。默认采用的是MERGE_CUMULATOR。

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            final ByteBuf buffer;
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                // Expand cumulation (by replace it) when either there is not more room in the buffer
                // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                // duplicate().retain() or if its read-only.
                // 如果cumulation是只读的、或者要超过capacity了,或者还有其他地方在引用, 则都通过创建一个新的byteBuf的方式来扩容ByteBuf
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            buffer.writeBytes(in);
            in.release();
            return buffer;
        }
    };

ByteToMessageDecoder中最主要的部分在channelRead处理上

  1. 收到一个msg后先判断是否是ByteBuf类型,是的情况创建一个CodecOutputList(也是一种list)保存转码后的对象列表
  2. 如果cumulation为null则把msg设置为cumulation,否则合并到cumulation里
  3. 调用callDecode方法,尝试解码
  4. finally中如果cumulation已经读完了,就release并置为null等待gc
  5. 调用fireChannelRead将解码后的out传递给后面的Handler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

callDecode中不断执行抽象decode(ctx, in, out)方法直到in可读数据没有减少或当前handler被remove。

 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                   // 检查当前handler是否被remove了
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                decodeRemovalReentryProtection(ctx, in, out);
                // 检查当前handler是否被remove了
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) { // 这种情况是解码出了对象但是并没有移动in的readIndex
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } ...
    }

fireChannelRead(ctx, msgs, numElements)的处理方式是对每个解码后的消息进行fireChannelRead,交给下一个Handler处理

/**
    static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
        if (msgs instanceof CodecOutputList) {
            fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
        } else {
            for (int i = 0; i < numElements; i++) {
                ctx.fireChannelRead(msgs.get(i));
            }
        }
    }
    static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
        for (int i = 0; i < numElements; i ++) {
            ctx.fireChannelRead(msgs.getUnsafe(i));
        }
    }

以上就是ByteToMessageDecoder的主要处理部分。关于Netty,面试中会喜欢问道“粘包/拆包”问题,指的是一个消息在网络中是二进制byte流的形式传过去的,接收方如何判断一个消息是否读完、哪里是分割点等,这些可以通过Netty中提供的一些Decoder来实现,例如DelimiterBasedFrameDecoder,FixedLengthFrameDecoder, LengthFieldBasedFrameDecoder。其中最常见的应该是LengthFieldBasedFrameDecoder了,因为自定义的协议中通常会有一个协议头,里面有一个字段描述一个消息的大小长度,然后接收方就能知道消息读到什么时候是读完一个Frame了。这些解码器会在后续的文章中介绍。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,290评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,107评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,872评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,415评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,453评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,784评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,927评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,691评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,137评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,472评论 2 326
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,622评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,289评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,887评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,741评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,316评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,490评论 2 348

推荐阅读更多精彩内容