提升能力从学习Netty开始

netty 介绍

一、 Netty 是什么

Netty 是一个广泛使用的 Java 网络编程框架
而Netty就是基于Java NIO技术封装的一套框架
一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持

二、Netty 组成部分

1 、Channel

NIO 基本结构,它代表了一个用于连接到实体如硬件设备、文件、网络套接字活程序组件,能够执行一个活多个不同的I/O 操作(例如读和写)的开放连接
可以把 Channe 想象成一个可以 、打开 、关闭 、连接、断开 、传入、传出 数据的运输工具

2、 Callback

callback(回调)是一个简单的方法,提供给另一种方法作为引用,这个后者就可以在某个合适的时间调用前者
Netty 内部使用回调处理事件时,一旦这样的回调被触发,事件可以有接口ChannelHandler的实现来处理

3、Futre

Futre 提供了另一种在操作完成时通知应用程序的方式 。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问

4、事件和 ChannelHandler

Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经
发生的事件来触发适当的动作。这些动作可能是:
记录日志;
数据转换;
流控制;
应用程序逻辑
传统的I/o是阻塞的
Java 用 Selector 实现非阻塞IO
使用了事件通知API以确定在一组非阻塞套接字中有那些已经能够进行IO相关的操作

netty 解决TCP粘包和拆包问题

三 、 什么时粘包和拆包

首先TCP是一个"流"协议,犹如河中水一样连成一片,没有严格的分界线。当我们在发送数据的时候就会出现多发送与少发送问题,也就是TCP粘包与拆包。得不到我们想要的效果。
所谓粘包:当你把A,B两个数据从甲发送到乙,本想A与B单独发送,但是你却把AB一起发送了,此时AB粘在一起,就是粘包了
所谓拆包: 如果发送数据的时候,你把A、B拆成了几份发,就是拆包了。当然数据不是你主动拆的,是TCP流自动拆的

解决办法

1、消息定长,比如把报文消息固定为500字节,不够用空格补位
2、在包尾增加回车换行符进行分割,例如FTP协议
3、将消息分为消息头和消息体,消息头中包含表示消息总长度的字段
4、更复杂的应用层协议

新建maven项目 pom.xml 引入netty相关依赖

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha1</version>
        </dependency>

编写服务端

package com.example.netty;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
 
/**
 * netty服务端
 * NioEventLoopGroup是一个处理I/O操作的多线程事件循环。
 * Netty为不同类型的传输提供了各种EventLoopGroup实现。
 * 我们在这个例子中实现了一个服务器端应用程序,
 * 因此将使用两个NioEventLoopGroup。第一个通常被称为“boss”,接受传入的连接。第二个通常称为“worker”,
 * 在boss接受连接并将接受的连接注册到worker之后,处理接受连接的流量。使用多少线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,
 * 甚至可以通过构造函数进行配置。
 *
 * ServerBootstrap是一个设置服务器的助手类。您可以直接使用通道设置服务器。但是,请注意这是一个冗长的过程,在大多数情况下您不需要这样做。
 * 在这里,我们指定使用NioServerSocketChannel类,该类用于实例化一个新通道以接受传入连接。
 *
 * 这里指定的处理程序总是由新接受的通道计算。ChannelInitializer是用于帮助用户配置新通道的特殊处理程序。
 * 您很可能希望通过添加一些处理程序(如DiscardServerHandler)来实现您的网络应用程序,来配置新通道的ChannelPipeline。
 * 随着应用程序变得复杂,您可能会向管道中添加更多的处理程序,并最终将这个匿名类提取到顶级类中。
 *
 * 您还可以设置特定于通道实现的参数。我们正在编写TCP/IP服务器,因此我们可以设置套接字选项,如tcpNoDelay和keepAlive。
 * 请参阅ChannelOption的apidocs和特定的ChannelConfig实现,以获得受支持的ChannelOptions的概述。
 *
 * 你注意到option()和childOption()了吗?option()用于接收传入连接的NioServerSocketChannel。
 * childOption()用于父服务器通道接受的通道,在本例中是NioServerSocketChannel。
 * 我们现在准备好出发了。剩下的就是绑定到端口并启动服务器。在这里,我们绑定到机器中所有NICs(网络接口卡)的端口8080。
 * 现在,您可以任意次数地调用bind()方法(使用不同的绑定地址)。
 *
 * telnet 可以测试服务器是否工作
 * telnet localhost 8080
 */
public class NettyServer {
    public static void main(String[] args) throws Exception {
        new NettyServer().run(8080);
    }
    //新建一个netty服务器
    public void run(int port) throws Exception{
        //NioEventLoopGroup是一个处理I/O操作的多线程循环
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        System.out.println("准备运行端口:" + port);
        try{
            //服务器的设置助手类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap = serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //这里指定的处理程序总是由新接受的通道计算。
                    .childHandler (new ChildChannelHandler());
            //绑定端口,同步等待成功
            ChannelFuture future = serverBootstrap.bind(port).sync();
            //等待服务监听端口关闭
            future.channel().closeFuture().sync();
    } finally {
                //退出,释放线程资源
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}
//ChannelInitializer是用于帮助用户配置新通道的特殊处理程序
class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
 
    //请求到达后调用
    protected void initChannel(SocketChannel socketChannel) throws Exception {
//        ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
//        socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
        socketChannel.pipeline().addLast(new StringDecoder());//进行字符串的编解码设置
        socketChannel.pipeline().addLast(new StringEncoder());
        socketChannel.pipeline().addLast(new ReadTimeoutHandler(60));//设置超时时间
        socketChannel.pipeline().addLast(new DiscardServerHandler());
    }
}
 
/**服务器类型设置
 *
 * **/
class DiscardServerHandler extends ChannelHandlerAdapter {
    @Override
    //只要接收到数据,就会调用channelRead()方法
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
 
        try {
            //ByteBuf是一个引用计数的对象,必须通过release()方法显式地释放它
            ByteBuf in = (ByteBuf) msg;
            System.out.println("传输内容是");
            System.out.println(in.toString(CharsetUtil.UTF_8));
            //返回信息
            ByteBuf resp= Unpooled.copiedBuffer("收到信息$".getBytes());
            ctx.writeAndFlush(resp);
        }  finally {
            System.out.println(msg);
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 出现异常就关闭
        cause.printStackTrace();
        ctx.close();
    }
 
}

编写客户端

package com.example.netty;
 
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
 
/**
 * netty 编写的客户端
 */
public class TimeClient {
    public static void main(String[] args) throws Exception {
        new TimeClient().connect(8080,"localhost");
    }
    public  void connect(int port, String host)throws Exception{
        //配置客户端
        System.out.println(port+"--"+host);
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //绑定端口,同步等待成功
            ChannelFuture future = bootstrap.connect(host,port).sync();
            //等待服务监听端口关闭
            future.channel().closeFuture().sync();
        }finally {
            //优雅退出,释放线程资源
            eventLoopGroup.shutdownGracefully();
        }
    }
}
class TimeClientHandler extends ChannelHandlerAdapter {
    private byte[] req;
    public TimeClientHandler(){
        req="$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$".getBytes();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message=null;
        for(int i=0;i<100;i++){
            message= Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            ByteBuf in = (ByteBuf) msg;
            System.out.println(in.toString(CharsetUtil.UTF_8));
        }  finally {
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 出现异常就关闭
        cause.printStackTrace();
        ctx.close();
    }
 
}

感谢你的阅读如果感觉本文对你有所帮助可以点一下喜欢和关注一下,让更多的人可以看到这篇文章谢谢。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容