定时断线重连

客户端断线重连机制。
客户端数量多,且需要传递的数据量级较大。可以周期性的发送数据的时候,使用。要求对数据的即时性不高的时候,才可使用。
优点: 可以使用数据缓存。不是每条数据进行一次数据交互。可以定时回收资源,对资源利用率高。相对来说,即时性可以通过其他方式保证。如: 120秒自动断线。数据变化1000次请求服务器一次。300秒中自动发送不足1000次的变化数据。


image.png
/**
 * 1. 双线程组
 * 2. Bootstrap配置启动信息
 * 3. 注册业务处理Handler
 * 4. 绑定服务监听端口并启动服务
 */
package com.bjsxt.socket.netty.timer;



import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class Server4Timer {
    // 监听线程组,监听客户端请求
    private EventLoopGroup acceptorGroup = null;
    // 处理客户端相关操作线程组,负责处理与客户端的数据通讯
    private EventLoopGroup clientGroup = null;
    // 服务启动相关配置信息
    private ServerBootstrap bootstrap = null;
    public Server4Timer(){
        init();
    }
    private void init(){
        acceptorGroup = new NioEventLoopGroup();
        clientGroup = new NioEventLoopGroup();
        bootstrap = new ServerBootstrap();
        // 绑定线程组
        bootstrap.group(acceptorGroup, clientGroup);
        // 设定通讯模式为NIO
        bootstrap.channel(NioServerSocketChannel.class);
        // 设定缓冲区大小
        bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
        bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
            .option(ChannelOption.SO_RCVBUF, 16*1024)
            .option(ChannelOption.SO_KEEPALIVE, true);
        // 增加日志Handler,日志级别为info
        // bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    }
    public ChannelFuture doAccept(int port) throws InterruptedException{
        
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
                ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
                // 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接。
                // 构造参数,就是间隔时长。 默认的单位是秒。
                // 自定义间隔时长单位。 new ReadTimeoutHandler(long times, TimeUnit unit);
                ch.pipeline().addLast(new ReadTimeoutHandler(3));
                ch.pipeline().addLast(new Server4TimerHandler());
            }
        });
        ChannelFuture future = bootstrap.bind(port).sync();
        return future;
    }
    public void release(){
        this.acceptorGroup.shutdownGracefully();
        this.clientGroup.shutdownGracefully();
    }
    
    public static void main(String[] args){
        ChannelFuture future = null;
        Server4Timer server = null;
        try{
            server = new Server4Timer();
            future = server.doAccept(9999);
            System.out.println("server started.");
            
            future.channel().closeFuture().sync();
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            if(null != server){
                server.release();
            }
        }
    }
    
}

/**
 * @Sharable注解 - 
 *  代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
 *  如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
 *  
 */
package com.bjsxt.socket.netty.timer;

import com.bjsxt.socket.utils.ResponseMessage;
import io.netty.channel.ChannelHandler.Sharable;


import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

@Sharable
public class Server4TimerHandler extends ChannelHandlerAdapter {
    
    // 业务处理逻辑
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from client : ClassName - " + msg.getClass().getName()
                + " ; message : " + msg.toString());
        ResponseMessage response = new ResponseMessage(0L, "test response");
        ctx.writeAndFlush(response);
    }

    // 异常处理逻辑
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server exceptionCaught method run...");
        // cause.printStackTrace();
        ctx.close();
    }

}

/**
 * 1. 单线程组
 * 2. Bootstrap配置启动信息
 * 3. 注册业务处理Handler
 * 4. connect连接服务,并发起请求
 */
package com.bjsxt.socket.netty.timer;

import java.util.Random;
import java.util.concurrent.TimeUnit;


import com.bjsxt.socket.utils.RequestMessage;
import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.WriteTimeoutHandler;

public class Client4Timer {
    
    // 处理请求和处理服务端响应的线程组
    private EventLoopGroup group = null;
    // 服务启动相关配置信息
    private Bootstrap bootstrap = null;
    private ChannelFuture future = null;
    
    public Client4Timer(){
        init();
    }
    
    private void init(){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        // 绑定线程组
        bootstrap.group(group);
        // 设定通讯模式为NIO
        bootstrap.channel(NioSocketChannel.class);
        // bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    }
    
    public void setHandlers() throws InterruptedException{
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
                ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
                // 写操作自定断线。 在指定时间内,没有写操作,自动断线。
                ch.pipeline().addLast(new WriteTimeoutHandler(3));
                ch.pipeline().addLast(new Client4TimerHandler());
            }
        });
    }
    
    public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
        if(future == null){
            future = this.bootstrap.connect(host, port).sync();
        }
        if(!future.channel().isActive()){
            future = this.bootstrap.connect(host, port).sync();
        }
        return future;
    }
    
    public void release(){
        this.group.shutdownGracefully();
    }
    
    public static void main(String[] args) {
        Client4Timer client = null;
        ChannelFuture future = null;
        try{
            client = new Client4Timer();
            client.setHandlers();
            
            future = client.getChannelFuture("localhost", 9999);
            for(int i = 0; i < 3; i++){
                RequestMessage msg = new RequestMessage(new Random().nextLong(),
                        "test"+i, new byte[0]);
                future.channel().writeAndFlush(msg);
                TimeUnit.SECONDS.sleep(2);
            }
            TimeUnit.SECONDS.sleep(5);
            
            future = client.getChannelFuture("localhost", 9999);
            RequestMessage msg = new RequestMessage(new Random().nextLong(), 
                    "test", new byte[0]);
            future.channel().writeAndFlush(msg);
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if(null != client){
                client.release();
            }
        }
    }
    
}

package com.bjsxt.socket.netty.timer;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class Client4TimerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from server : ClassName - " + msg.getClass().getName()
                + " ; message : " + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exceptionCaught method run...");
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 当连接建立成功后,出发的代码逻辑。
     * 在一次连接中只运行唯一一次。
     * 通常用于实现连接确认和资源初始化的。
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active");
    }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 226,828评论 6 526
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 97,669评论 3 412
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 174,467评论 0 373
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 62,238评论 1 306
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 71,039评论 6 405
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 54,561评论 1 319
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 42,658评论 3 433
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 41,806评论 0 285
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 48,316评论 1 329
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,293评论 3 353
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 42,443评论 1 364
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,998评论 5 355
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 43,698评论 3 342
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,097评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 35,333评论 1 281
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,012评论 3 385
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 47,434评论 2 370

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,764评论 18 139
  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,980评论 2 89
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,553评论 25 707
  • 我默默地走来 就像我静静的走开 回忆不全是美好的 就好比人生 没有人可以一帆风顺 但总有高潮和起伏 人生需学会忘却...
    昕爸阅读 174评论 1 3
  • 一个账户可视为在线的访问凭证。nodeos管理着在区块链上发布账户以及与账户相关联的行为。我们通过cleos与no...
    编程狂魔阅读 639评论 0 1