Netty-TCP拆包/粘包

Netty-TCP拆包/粘包

TCP拆包/粘包

TCP 是一个面向字节流的协议,它是性质是流式的,所以它并没有分段。就像水流一样,你没法知道什么时候开始,什么时候结束。所以它会根据当前的套接字缓冲区的情况进行拆包或是粘包

粘包问题图示:

图1.png

客户端发送两个数据包D1&D2给服务端,因为服务端一次读取的字节数是不确定的,所以可能出现:

  • 正常情况,服务端分两次读取到了两个独立的数据包
  • 服务端一次收到两个数据包,两个粘合在了一起,出现粘包现象

  • 服务端分两次读取到了两个数据包,第一次读取到完整的D1包&部分D2包的内容,第二次读取到了D2剩余内容,出现拆包现象

  • 服务端分两次读取到了两个数据包,第一次读取D1的部分,第二次读取了D1剩余内容以及完整D2

TCP拆包/粘包发送原因

图示:

图2.png

三个原因:

  • 应用程序write写入的字节大小大于套接口发送缓冲区的大小

  • 进行MSS大小的TCP分段

  • 以太网帧的payload大于MTU进行IP分片

例子

未考虑TCP粘包的情况


public class Client {

    public void connect(String host, int port) throws InterruptedException {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
//                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline p = ch.pipeline();
//                            p.addLast(new LineBasedFrameDecoder(1024));
//                            p.addLast(new StringDecoder());
//                            p.addLast(new StringEncoder());

                            p.addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = b.connect(host, port).sync();

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new Client().connect("localhost",9988);
    }


}


public class ClientHandler extends ChannelInboundHandlerAdapter {

//    private Logger logger = LoggerFactory.getLogger(getClass());

    private int count =0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // Send the message to Server
        ByteBuf buf = null;
        for(int i=0; i<100; i++){

            String msg = "hello from client "+i + "\n";
            byte[] r = msg.getBytes();
            buf = Unpooled.buffer(r.length);
            buf.writeBytes(r);
           ctx.writeAndFlush(buf);
//            System.out.println("client send message:{}   " + msg);


//            ctx.writeAndFlush(msg+System.getProperty("line.separator"));
        }

        System.out.println("out");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        count++;
        System.out.println("client read msg:{}, count:{}   " + body + "    " + count);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


public class Server {

//    private Logger logger = LoggerFactory.getLogger(getClass());

    public void bind(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
//                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline p = ch.pipeline();
//                            p.addLast(new LineBasedFrameDecoder(1024));
//                            p.addLast(new StringDecoder());
//                            p.addLast(new StringEncoder());

                            p.addLast(new ServerHandler());
                        }
                    });

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            System.out.println("server bind port:{}    "+ + port);

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new Server().bind(9988);
    }
}


public class ServerHandler extends ChannelInboundHandlerAdapter {
//    private Logger logger = LoggerFactory.getLogger(getClass());

    private int count = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
//        System.out.println("len  " + req.length);
        buf.readBytes(req);
        String body = new String(req, "UTF-8");

        System.out.println("============package=====================");
        System.out.println(body);
        System.out.println("============package=====================");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}






运行输出:


============package=====================
hello from client 0
hello from client 1
hello from client 2
hello from client 3
hello from client 4
hello from client 5
hello from client 6
hello from client 7
hello from client 8
hello from client 9
hello from client 10
hello from client 11
hello from client 12
hello from client 13
hello from client 14
hello from client 15
hello from client 16
hello from client 17
hello from client 18
hello from client 19
hello from client 20
hello from client 21
hello from client 22
hello from client 23
hello from client 24
hello from client 25
hello from client 26
hello from client 27
hello from client 28
hello from client 29
hello from client 30
hello from client 31
hello from client 32
hello from client 33
hello from client 34
hello from client 35
hello from client 36
hello from client 37
hello from client 38
hello from client 39
hello from client 40
hello from client 41
hello from client 42
hello from client 43
hello from client 44
hello from client 45
hello from client 46
hello from client 47
hello from client 48
hello
============package=====================
============package=====================
 from client 49
hello from client 50
hello from client 51
hello from client 52
hello from client 53
hello from client 54
hello from client 55
hello from client 56
hello from client 57
hello from client 58
hello from client 59
hello from client 60
hello from client 61
hello from client 62
hello from client 63
hello from client 64
hello from client 65
hello from client 66
hello from client 67
hello from client 68
hello from client 69
hello from client 70
hello from client 71
hello from client 72
hello from client 73
hello from client 74
hello from client 75
hello from client 76
hello from client 77
hello from client 78
hello from client 79
hello from client 80
hello from client 81
hello from client 82
hello from client 83
hello from client 84
hello from client 85
hello from client 86
hello from client 87
hello from client 88
hello from client 89
hello from client 90
hello from client 91
hello from client 92
hello from client 93
hello from client 94
hello from client 95
hello from client 96
hello from client 97
hello from client 98
hello from client 99

============package=====================

客户端发送的100条消息被当成了两个数据包进行处理,说明发送了粘包现象

使用LineBasedFrameDecoder + StringDecoder 解决问题

LineBasedFrameDecoder

文档:

public class LineBasedFrameDecoder
extends ByteToMessageDecoder

在行尾拆分接收到的ByteBuf的解码器,“ \ n”和“ \ r \ n”都被处理,字节流应采用UTF-8字符编码或ASCII。 当前实现使用直接字节进行字符转换,然后将该字符与一些低范围的ASCII字符(例如'\ n'或'\ r')进行比较。 UTF-8没有将低范围[0..0x7F]字节值用于多字节代码点表示,因此此实现完全支持。

LineBasedFrameDecoder 的工作原理是它依次遍历 ByteBuf 中的可读字节,判断看是否有 "\n” 或者 "\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以回车换行符为结束标记的解码器,支持配置单行的最大长度,如果连续读取到最大长度后仍然没有发现换行符,会抛出异常,同时忽略掉之前读取到的异常码流。

StringDecoder

public class StringDecoder
extends MessageToMessageDecoder<ByteBuf>

将收到的ByteBuf解码为字符串。 请注意,如果使用的是基于流的传输方式(例如TCP / IP),则此解码器必须与适当的ByteToMessageDecoder(例如DelimiterBasedFrameDecoder或LineBasedFrameDecoder)一起使用。 TCP / IP套接字中基于文本的线路协议的典型设置为:

 ChannelPipeline pipeline = ...;

 // Decoders
 pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(80));
 pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));

 // Encoder
 pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
 
and then you can use a String instead of a ByteBuf as a message:
 void channelRead(ChannelHandlerContext ctx, String msg) {
     ch.write("Did you say '" + msg + "'?\n");
 }

StringEncoder

public class StringEncoder
extends MessageToMessageEncoder<java.lang.CharSequence>

将请求的字符串编码为ByteBuf。 TCP / IP套接字中基于文本的线路协议的典型设置为:

 ChannelPipeline pipeline = ...;

 // Decoders
 pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(80));
 pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));

 // Encoder
 pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
 
and then you can use a String instead of a ByteBuf as a message:
 void channelRead(ChannelHandlerContext ctx, String msg) {
     ch.write("Did you say '" + msg + "'?\n");
 }

应用:

public class Server {

//    private Logger logger = LoggerFactory.getLogger(getClass());

    public void bind(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
//                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new LineBasedFrameDecoder(1024));
                            p.addLast(new StringDecoder());
//                            p.addLast(new StringEncoder());

                            p.addLast(new ServerHandler());
                        }
                    });

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            System.out.println("server bind port:{}    "+ + port);

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new Server().bind(9988);
    }
}

public class ServerHandler extends ChannelInboundHandlerAdapter {
//    private Logger logger = LoggerFactory.getLogger(getClass());

    private int count = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        String body = (String) msg;
        System.out.println("");
        System.out.println("============package=====================");
        System.out.println(body);
        System.out.println("============package=====================");
        System.out.println("");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}


public class ClientHandler extends ChannelInboundHandlerAdapter {

    private int count =0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // Send the message to Server
        ByteBuf buf = null;
        for(int i=0; i<100; i++){

            String msg = "hello from client "+i + "\n";
            byte[] r = msg.getBytes();
            buf = Unpooled.buffer(r.length);
            buf.writeBytes(r);
           ctx.writeAndFlush(buf);
        }

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        count++;
        System.out.println("client read msg:{}, count:{}   " + body + "    " + count);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}



public class Client {

    public void connect(String host, int port) throws InterruptedException {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
//                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new LineBasedFrameDecoder(1024));
                            p.addLast(new StringDecoder());
//                            p.addLast(new StringEncoder());

                            p.addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = b.connect(host, port).sync();

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new Client().connect("localhost",9988);
    }


}



©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容