Netty实战六:Netty处理同一个端口上来的多条不同协议的数据—优化版

在实战三中,我们处理了同一个端口上来的2种不同协议的数据,项目上线后,运行良好,之后项目又需要添加一种数据协议,按照同样的方法处理再上线后,发现在网络很差的情况下,会有数据丢包现象。
为了更加通用,针对项目进行了重构,对于netty处理也增加了不少优化。

优化点:

  • 使用工厂模式,这样的话,就不需要好几个decoder和hander;
  • 通过创建多个nettyserver实例(监听不同端口),达到隔离不同数据协议;
  • 在decoder中存储socketChannel和协议的对应关系(HashMap),这样在handler中就可以通过channel来获取到协议类型,然后通过协议类型来创建工厂,通过工厂来处理具体数据;
  • 粘包/拆包处理,之前的处理方式因为对Netty byteBuf认识不足,所以在处理粘包时可能会对数据;

重构之后,过两天就会上线,现在我们总共支持4种不同的数据协议(四种不同厂家的设备),就算还要继续增加,项目结构上也可以很快处理完成。

1、Demo

1、NettyServer.class

package org.xxx.android.netty.server;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Created by zhangkai on 2018/6/11.
 * NioEventLoopGroup → EpollEventLoopGroup
   NioEventLoop → EpollEventLoop
   NioServerSocketChannel → EpollServerSocketChannel
   NioSocketChannel → EpollSocketChannel
   @Component
 */
public class NettyServer{
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    EventLoopGroup boss =null;
    EventLoopGroup worker =null;
    ChannelFuture future = null;
    //厂商编码
    Integer factoryCode=null;

    boolean epoll=true;
    int port;
    public NettyServer(Integer fc,int port){
        this.factoryCode=fc;
        this.port=port;
    }

    @PreDestroy
    public void stop(){
        if(future!=null){
            future.channel().close().addListener(ChannelFutureListener.CLOSE);
            future.awaitUninterruptibly();
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            future=null;
            logger.info(" 服务关闭 ");
        }
    }
    public void start(){
        logger.info(" nettyServer 正在启动");
        
        if(epoll){
            logger.info(" nettyServer 使用epoll模式");
            boss = new EpollEventLoopGroup();
            worker = new EpollEventLoopGroup();
        }
        else{
            logger.info(" nettyServer 使用nio模式");
            boss = new NioEventLoopGroup();
            worker = new NioEventLoopGroup();
        }
        
        logger.info("netty服务器在["+this.port+"]端口启动监听");
        
        serverBootstrap.group(boss,worker)
            .option(ChannelOption.SO_BACKLOG,1024)
            .option(EpollChannelOption.SO_REUSEPORT, true)
            .handler(new LoggingHandler(LogLevel.INFO))
            .option(ChannelOption.TCP_NODELAY,true)
            .childOption(ChannelOption.SO_KEEPALIVE,true)
            .childHandler(new NettyServerInitializer(this.factoryCode));
        
        if(epoll){
            serverBootstrap.channel(EpollServerSocketChannel.class);
        }else{
            serverBootstrap.channel(NioServerSocketChannel.class);
        }
        
        
        try{
            future = serverBootstrap.bind(this.port).sync();
            if(future.isSuccess()){
                logger.info("nettyServer 完成启动 ");
            }
            // 等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        }catch (Exception e){
            //boss.shutdownGracefully();
            //worker.shutdownGracefully();
            logger.info("nettyServer 启动时发生异常---------------{}",e);
            logger.info(e.getMessage());
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

2、NettyServerInitializer.class

package org.xxx.android.netty.server;
import java.util.concurrent.TimeUnit;
import org.xxx.android.netty.NettyConstants;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * Created by zhangkai on 2018/6/11.
 */
public class NettyServerInitializer extends ChannelInitializer<SocketChannel>{
    Integer factoryCode=null;
    public NettyServerInitializer(Integer fc){
        this.factoryCode=fc;
    }
    
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new IdleStateHandler(
                NettyConstants.SERVER_READ_IDEL_TIME_OUT,
                NettyConstants.SERVER_WRITE_IDEL_TIME_OUT,
                NettyConstants.SERVER_ALL_IDEL_TIME_OUT,
                TimeUnit.SECONDS));
        pipeline.addLast(new AcceptorIdleStateTrigger());

        pipeline.addLast(new StringEncoder());
        pipeline.addLast(new ByteArrayEncoder());

        pipeline.addLast(new NettyServerDecoder(this.factoryCode));
        pipeline.addLast(new NettyServerHandler());
    }
}

3、NettyServerDecoder.class

package org.xxx.android.netty.server;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.channel.socket.SocketChannel;
import org.xxx.android.factory.util.FactoryMap;
import org.xxx.android.factory.util.FactoryUtil;
import org.xxx.android.factory.util.MessageUtil;
import org.xxx.android.factory.vo.FactoryEnum;
import org.xxx.android.netty.delegate.DecoderDelegate;
import org.xxx.android.netty.server.decoder.IDecoder;
import org.xxx.android.util.DataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class NettyServerDecoder extends ByteToMessageDecoder {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    /*
     * 记录设备登录次数
     */
    static volatile Map<Integer,Integer> timesMap=new ConcurrentHashMap<Integer,Integer>();
    /*
     * 解码器委托模式
     */
    DecoderDelegate decoderDelegate=null;
    
    Integer factoryCode=null;
    public NettyServerDecoder(Integer fc){
        this.factoryCode=fc;
        this.decoderDelegate=new DecoderDelegate();
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
        try {
            in.retain();
            Channel channel=channelHandlerContext.channel();
            int hashCode=channel.hashCode();
            
            ByteBufToBytes reader = new ByteBufToBytes();
            byte[] byteData = reader.read(in);
            log.info("服务端接收到的原始消息为{}={}",hashCode,DataUtil.ByteArrToHexString(byteData));
            //根据通道获取厂商
            FactoryEnum channelFactory=null;
            if(this.factoryCode==null){
                //AA、BB、CC未指名工厂,从消息中获取工厂
                channelFactory=this.indentifyFromMsg(channel, byteData, in, list);
            }
            else{
                channelFactory=FactoryEnum.codeOf(this.factoryCode);
                FactoryMap.putChannelDecoder(hashCode, channelFactory.getCode());
            }
            if(channelFactory==null){
                log.info("设备{}消息未识别",hashCode);
                return;
            }
            //获取解码器
            IDecoder decoder=decoderDelegate.getDelegate(channelFactory);
            if(decoder==null){
                log.info("设备{}厂商{}解码器未配置",hashCode,channelFactory.toString());
                return;
            }
            boolean complete = decoder.decoder(hashCode, byteData, in, list);
            if (!complete) {
                log.info("未识别出完整消息,继续接收{}", DataUtil.ByteArrToHexString(byteData));
                return;
            }
            
        }catch (Throwable e){
            log.error("解析出错{}",e);
        }
    }
    /*
     * 从消息中获取工厂
     */
    private FactoryEnum indentifyFromMsg(Channel channel, byte[] byteData, ByteBuf in,
            List<Object> list) {
        int hashCode=channel.hashCode();
        FactoryEnum channelFactory = FactoryUtil.indentifyByChannel(channel);
        if (channelFactory==null) {
            //根据数据识别出厂商
            channelFactory= MessageUtil.getMsgType(byteData);
            if (channelFactory == null) {
                int times=1;
                if(timesMap.containsKey(hashCode)){
                    times=timesMap.get(hashCode)+1;
                }
                if(times==5){
                    log.info("设备{}已登录5次,服务器关闭连接",hashCode);
                    timesMap.remove(hashCode);
                    //关闭通道
                    channel.close();
                    return null;
                }
                else{
                    timesMap.put(hashCode, times);
                }
                //厂商未能识别,继续接收
                in.resetReaderIndex();
                log.info("设备{}厂商未能识别,继续接收{}", hashCode,
                        DataUtil.ByteArrToHexString(byteData));
            }
            else{
                //在decoder中存储socketChannel和协议的对应关系
                FactoryMap.putChannelDecoder(hashCode, channelFactory.getCode());
            }
        } else {
            timesMap.remove(hashCode);
            log.info("从通道获取厂商成功:{}={}",
                    hashCode,
                    channelFactory.toString());
        }
        return channelFactory;
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.err.println("--------数据读异常----------: ");
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        System.err.println("--------数据读取完毕----------");
    }
    
}

4、NettyServerHandler.class

package org.xxx.android.netty.server;

import org.apache.commons.lang3.StringUtils;
import org.xxx.android.factory.IFactory;
import org.xxx.android.factory.util.FactoryMap;
import org.xxx.android.factory.util.FactoryUtil;
import org.xxx.android.factory.vo.FactoryEnum;
import org.xxx.android.netty.delegate.FactoryDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
/**
 * 多线程共享
 */
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    public final Logger log = LoggerFactory.getLogger(getClass());
    /*
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("----客户端设备连接:{}", ctx);
        ctx.fireChannelActive();
    }
    */
    @Override
    public void channelInactive(ChannelHandlerContext chc) throws Exception {
        SocketChannel socketChannel = (SocketChannel) chc.channel();

        String clientId = FactoryMap.getDevNoByChannel(socketChannel);
        log.info("----客户端设备连接断开:{}", clientId);
        if (!StringUtils.isEmpty(clientId)) {
            FactoryMap.removeChannelByDevNo(clientId);
            FactoryMap.removeChannelDecoder(chc.channel().hashCode());
            FactoryMap.removeChannelFactory(chc.channel().hashCode());
            //客户端断开
            FactoryUtil.getFactoryService().syncNetworkStatus(clientId, 0);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        //System.err.println("--------数据读取完毕----------");
    }
    @Override
    public void channelRead(ChannelHandlerContext chc, Object message) throws Exception {
        try {
            Channel channel=chc.channel();
            //获取协议类型
            Integer channelFactory=FactoryMap.getDecoderByChannel(channel.hashCode());
            if(channelFactory==null){
                log.info("解码器未能维护通道和工厂关系");
                return;
            }
            FactoryEnum factoryEnum=FactoryEnum.codeOf(channelFactory);
            if (factoryEnum == null) {
                log.info("解析消息失败,未识别消息所属厂家");
                return;
            }
            this.factoryMessage(channel,message,factoryEnum);
            
        }catch (Exception e){
            log.error("处理业务消息失败,{}",e);
        }
    }
    void factoryMessage(Channel channel, Object msg,FactoryEnum factoryEnum) {
        //处理消息
        /*
        byte[] data = (byte[])message;
        log.info("{}{}接收到通道{}的原始消息=={}",
                factoryEnum.getTitle(),
                NettyMap.getDevNoByChannel(socketChannel),
                socketChannel.hashCode(),
                DataUtil.bytesToHexString(data));
        */

        IFactory factory=FactoryMap.getFactoryByChannel(channel.hashCode());
        SocketChannel socketChannel = (SocketChannel) channel;
        if(factory==null){
            //委托模式创建工厂
            factory = FactoryDelegate.createFactory(factoryEnum);
            //对接收到的消息进行处理
            factory.processMessage(socketChannel,msg);
            FactoryMap.putChannelFactory(socketChannel.hashCode(), factory);
        }
        else{
            //对接收到的消息进行处理
            factory.processMessage(socketChannel,msg);
        }
        log.info("{}={}",socketChannel.hashCode(),factory.getFactoryDevNo());
    }
}

5、SpringbootApplication.class

package org.xxx.android;

import org.xxx.android.netty.server.NettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.embedded.ConfigurableEmbeddedServletContainer;
import org.springframework.boot.context.embedded.EmbeddedServletContainerCustomizer;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 该注解指定项目为springboot,由此类当作程序入口
* 自动装配 web 依赖的环境
**/
//@Slf4j
@EnableJpaAuditing
@EnableScheduling
@SpringBootApplication
public class SpringbootApplication implements CommandLineRunner,EmbeddedServletContainerCustomizer{
   @Value("${server.port}")
   int serverPort;
   

   @Value("${netty.startup}")
   int startupStartup;
   // 注入NettyServer
   @Autowired NettyServer nettyServer;
   @Autowired NettyServer yyNettyServer;
   
   public static void main(String[] args) {
       SpringApplication.run(SpringbootApplication.class, args);
   }
   @Override
   public void customize(ConfigurableEmbeddedServletContainer container) {
       container.setPort(serverPort);
   }
   @Override
   public void run(String... strings) {
       this.startNettyServer();
    }
    void startNettyServer() {
        if(startupStartup==1){
            this.nettyThreadStart(nettyServer);
            this.nettyThreadStart(yyNettyServer);
            Runtime.getRuntime().addShutdownHook(new Thread(){
                 @Override
                 public void run(){
                     stopNettyServer();
                 }
            });
       }
    }
    void stopNettyServer() {
        nettyServer.stop();
        yyNettyServer.stop();
    }
    void nettyThreadStart(final NettyServer ns) {
        Thread thread = new Thread(new Runnable(){
            @Override
            public void run() {
                ns.start();
            }
        });
        thread.start();
    }
}

2、粘包/拆包解决思路

基本思路就是不断从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包;
若当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包

  • 定长——FixedLengthFrameDecoder
  • 分隔符——DelimiterBasedFrameDecoder
  • 基于长度的变长包——LengthFieldBasedFrameDecoder

若当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接

这里最重要的是就是,使用markReaderIndex标记读索引,使的多余的数据保留,继续等待后面的数据

        //BB数据
        //判断心跳
        if(isXyHeart(byteData)){
            list.add(byteData);
            return true;
        }

        //判断是否是开头
        if(isXYMsgHeader(byteData)){
            headIndexMap.remove(hashCode);
            int length = DataUtil.byteToInt(byteData[XyConstant.BUSINESS_RSP_MSG_FIELD.LEN.INDEX]);
            //整包
            if(length == byteData.length){
                //判断校验和
                if(!isCheckNum(byteData,length)){
                    log.error("兴元数据包校验和不通过{}!={}==={}",byteData[length-1]& FactoryConstant.BYTE_MASK, XyBusinessReqMsgUtil.getCheckSum(byteData)&FactoryConstant.BYTE_MASK,DataUtil.ByteArrayToString(byteData));
                }else {
                    list.add(byteData);
                    return true;
                }
            }
            if(length > byteData.length){
                //半包,继续接收
                in.resetReaderIndex();
                return false;
            }
            if(length < byteData.length){
                log.info("粘包=====接收数据大于帧长度{}>{}",byteData.toString(),length);
                return dealStickyPackage(in, list, length);
            }
    public boolean dealStickyPackage(ByteBuf in, List<Object> list, int length) {
        //粘包,重置读索引
        in.resetReaderIndex();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        //已接收到的完整包数据传给handler去处理
        list.add(bytes);
        //标记读索引,相当于清除当前读索引readIndex之前的数据
        //剩下的数据就是下一条数据的开头,继续等待接收
        in.markReaderIndex();
        return false;
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,036评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,046评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,411评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,622评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,661评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,521评论 1 304
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,288评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,200评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,644评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,837评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,953评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,673评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,281评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,889评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,011评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,119评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,901评论 2 355

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,656评论 18 139
  • 为什么要粘包拆包 为什么要粘包 首先你得了解一下TCP/IP协议,在用户数据量非常小的情况下,极端情况下,一个字节...
    简书闪电侠阅读 20,644评论 23 77
  • 引用:https://blog.csdn.net/yuanpeng1014/article/details/708...
    白色天空729阅读 156评论 0 0
  • 今天的月亮不算很圆,但是很亮,我独自坐在阳台上,看着满园的绿植,闻着青草沾着露水的香气,听到了不知从哪里传来的辩论...
    路旁土土阅读 302评论 0 0