基于Rxjava2与OkHttp中WebSocket长连接封装(断连重试机制)

WebSocket 在开发中遇到的情况很少,导致在使用的时候可能遇到很多的问题,比如它的重连机制、发送数据的统一、结合Service使用;下面的文章将使用OkHttp中的WebSocket以及Rxjava2结合Service实现断开重连,发送和接收服务器的数据。

WebSocket

我们使用WebSocket的库有很多,类似 AndroidAsync,作者对它的介绍是一个更加底层的异步网络库,使用这个库也非常简单,传一个地址和协议再加一个回调接口,就能简单的使用:

AsyncHttpClient.getDefaultInstance()
    .websocket(get, "my-protocol", new WebSocketConnectCallback() {
    @Override
    public void onCompleted(Exception ex, WebSocket webSocket) {
        if (ex != null) {
            ex.printStackTrace();
            return;
        }
        webSocket.send("a string");
        webSocket.send(new byte[10]);
        webSocket.setStringCallback(new StringCallback() {
            public void onStringAvailable(String s) {
                System.out.println("I got a string: " + s);
            }
        });
        webSocket.setDataCallback(new DataCallback() {
            public void onDataAvailable(DataEmitter emitter, 
                              ByteBufferList byteBufferList) {
                System.out.println("I got some bytes!");
                // note that this data has been read
                byteBufferList.recycle();
            }
        });
    }
});

但是,我们看到这个库却是有非常多的类,通常的,我们的网络请求库都只有一个,现在大部分Android开发都会使用类似OkHttp或者Volley来请求网络,如果再加一个库的话,无疑增加了应用的代码量,这里我们使用OkHttp中内置的WebSocket,基于它的一个封装,我们也能写一个轻量级的WebSocket应用; 而OkHttp中的WebSocket使用也是封装方便:

// 创建一个Request
Request request = new Request.Builder()
        .url(socketUrl)
        .build();
OkHttpClient client = new OkHttpClient();
// 使用OkHttpClient 来创建一个WebSocket
client.newWebSocket(request, new WebSocketListener() {
    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        super.onOpen(webSocket, response);
    }
    @Override
    public void onMessage(WebSocket webSocket, String text) {
        super.onMessage(webSocket, text);
    }
    @Override
    public void onMessage(WebSocket webSocket, ByteString bytes) {
        super.onMessage(webSocket, bytes);
    }
    @Override
    public void onClosing(WebSocket webSocket, int code, String reason) {
        super.onClosing(webSocket, code, reason);
    }
    @Override
    public void onClosed(WebSocket webSocket, int code, String reason) {
        super.onClosed(webSocket, code, reason);
    }
    @Override
    public void onFailure(WebSocket webSocket, Throwable t,
                @javax.annotation.Nullable Response response) {
        super.onFailure(webSocket, t, response);
    }
});
client.dispatcher().executorService().shutdown();

可以看到,先创建一个Request,我们在调用接口的时候,也是创建一个Request对象,然后使用OkHttpClient来创建一个WebSocket,回调到WebSocketListener监听接口之后,再做逻辑业务处理。(接口方法意思都很简单,使用的时候再说明)

Rxjava2 使用

前面的博客都简单的介绍了Rxjava2的使用,最近在项目中使用的也比较频繁,后面再写一些项目中Rxjava2的实战:

Rxjava2 学习创建型操作符
Rxjava2 学习变换操作符
Rxjava2 学习过滤操作符

项目开发:

既然是使用到逻辑业务上的操作,和界面无关,自然会想到Service; 将其直接封装成Service,在使用的时候直接bindService或者startService会方便很多,不知不觉中将逻辑业务和页面分隔开; 如果我们直接在页面中使用也未尝不可,但是这样就不便复用了。

所以开始我们先创建一个Service:

public WebSocketService extends Service {
    public static final String LOG_TAG = "WebSocketTest";
   
    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        Log.v(LOG_TAG, "----- onBind -----");
        return new ServiceBinder();
    }
    public class ServiceBinder extends Binder {
        public WebSocketService getService() {
            return WebSocketService.this;
        }
    }
    @Override
    public void onCreate() {
        super.onCreate();
        Log.i(LOG_TAG, "----- onCreate -----");
    }
    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        //Use this to force restart service
        return START_STICKY;
    }
    @Override
    public void onDestroy() {
        super.onDestroy();
        Log.i(LOG_TAG, "----- onDestroy -----");
    }
}

Service常用的步骤,使用其中的方法,这里我们使用bindService回调一个ServiceConnect接口,因为我们需要使用到这个WebSocketService实例;接下来,我们就需要在onCreate方法中做一些初始化的操作:

/**
 * 初始化
 *
 * @param startReason
 * @param isFirstConnect
 */
private void initSocketWrapper(String startReason, boolean isFirstConnect) {
    // 拿到Reason,打印log
    Observable.just(startReason)
            .filter(new Predicate<String>() {
                @Override
                public boolean test(String s) throws Exception {
                    // 判断当前是否正在连接
                    if (isAttemptConnecting) {
                        Log.v(LOG_TAG, startReason + " : Should reconnect but"+
                                       "already in process, skip.");
                        return Boolean.FALSE;
                    }
                    return Boolean.TRUE;
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    if ((mWebSocket == null)     // 如果已经为空
                        && (!isFirstConnect)         // 不是第一次连接
                        && (!isAttemptConnecting)) {    // 当前没有在尝试连接
                        showUiWebSocketStatus("与服务器失去连接!!!");
                    }
                }
            })
            .observeOn(Schedulers.io())
            .subscribe(s -> initSocket());
}    

上面写清楚了注释,这个方法主要是初始化,拿到当前的原因和是否是第一次连接,还有一个全局变量 isAttemptConnecting 来判断当前WebSocket是否在连接中,然后在这之间先判断 如果websocket为空并且不是第一次连接,而且还没有尝试连接,则toast提示用户断开连接!!!

接下来就是使用OkHttp中WebSocket创建连接了:

/**
 * 初始化WebSocket
 */
private void initSocket() {
    // ... 省略一些状态切换代码
    ...    

    // 开始初始化
    Observable.create(new ObservableOnSubscribe<WebSocket>() {
        @Override
        public void subscribe(ObservableEmitter<WebSocket> emitter) 
                                            throws Exception {
            //TODO 这里可以进行登录业务判断
            Request request = new Request.Builder()
                    .url(socketUrl)
                    .build();
            OkHttpClient client = new OkHttpClient();
            client.newWebSocket(request, new WebSocketListener() {
                @Override
                public void onOpen(WebSocket webSocket, Response response) {
                    super.onOpen(webSocket, response);
                    isAttemptConnecting = false;
                    connectionAttemptCount = 0;
                    // 连接成功之后
                    mWebSocket = webSocket;
                    dispatchStringMessage("连接成功!!!");
                    emitter.onNext(mWebSocket);
                    emitter.onComplete();
                }
                @Override
                public void onMessage(WebSocket webSocket, String text) {
                    super.onMessage(webSocket, text);
                    dispatchStringMessage(text);
                }
                @Override
                public void onMessage(WebSocket webSocket, ByteString bytes) {
                    super.onMessage(webSocket, bytes);
                }
                @Override
                public void onClosing(WebSocket webSocket, int code, String reason) {
                    super.onClosing(webSocket, code, reason);
                }
                @Override
                public void onClosed(WebSocket webSocket, int code, String reason) {
                    super.onClosed(webSocket, code, reason);
                    Log.i(LOG_TAG, "ClosedCallback: WebSocket closed.");
                    // 等待自检重启,或者自然关闭
                    if ((!preparedShutdown) && (shouldAutoReconnect)) {
                        initSocketWrapper("onClose");
                    }
                }
                @Override
                public void onFailure(WebSocket webSocket, Throwable t, 
                                  @Nullable Response response) {
                    super.onFailure(webSocket, t, response);
                    dispatchStringMessage("连接失败!!!");
                    emitter.onError(t != null ? t 
                        : new ConnectException("Cannot connect we service!!!"));
                }
            });
            client.dispatcher().executorService().shutdown();
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<WebSocket>() {
        @Override
        public void accept(WebSocket webSocket) throws Exception {
            if (pongService == null) {
                startPongDaemonService();
            }
        }
    }, 
    new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        throwable.printStackTrace();
        // 判断是否需要执行诊断服务
        if (connectionAttemptCount >= ATTEMPT_TOLERANCE) {
            // 强制开始诊断服务
            startService(new Intent(WebSocketService.this, 
                                    NetworkDiagnosisService.class));
            // 重置标记
            connectionAttemptCount = 0;
        }
    }
}

使用create操作符创建一个被观察者对象发射器,在其中使用OkHttp的创建WebSocket方式创建WebSocket,然后根据连接的结果进行emitter发射 onNext()、onError()、onComplete() 方法,连接成功之后,开始发送 自检服务 Pong; 连接失败之后,开始检测网络是否有连接:NetWorkDiagnosisService 网络诊断服务。

发送Pong守护进程,先创建一个单线程线程池,然后发送消息:

/**
 * 给服务器发送Pong自检
 */
private void startPongDaemonService() {
    pongService = Executors.newSingleThreadScheduledExecutor();
    pongService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            if (mWebSocket != null) {
                sendRequest(WsObjectPool.newPongRequest());
            }
        }
    }, 10, 10, TimeUnit.SECONDS);
    Log.i(LOG_TAG, "Pong service has been scheduled at " + 10 + " seconds delay.");
}

延迟十秒执行,使用一个WebSocketObjectPool对象池,为了方便取出对象数据。

整个WebSocket初始化就在上面,流程还是比较简单的,接下来就是从服务器拿到数据之后分发数据了, 因为我这里测试只用到String数据,如果需要用到Json的数据,则分发json数据即可:

    /**
     * 方法字符串
     *
     * @param message
     */
    private void dispatchStringMessage(String message) {
        Observable.just(message)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        WsListener<String> listener =
                         (WsListener<String>)activeListener.get(
                 SocketConstants.ResponseType.RESPONSE_STRING_MESSAGE);
                        Log.d(LOG_TAG, "Msg entity: " + s + ".");
                        if (listener != null) {
                            listener.handleData(s);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

拿到message String数据之后,或者到外部监听器,因为是使用到String字符串类型,所以这里直接获取到一个监听器,然后分发给方法。

如果是方法json的话,就去GitHub下载代码查看详细代码:代码

自检服务

连接成功之后,我们需要自检,因为可能在连接过程中出现断连的情况,网络不稳定情况,所以需要使用循环的自检:

/**
 * 启动自检服务,按照周期执行
 */
private void startSelfCheckService() {
    // 自检服务器打开
    mSelfCheckDispose = Observable
            .interval(10, 10, TimeUnit.SECONDS)
            .filter(new Predicate<Long>() {
                @Override
                public boolean test(Long aLong) throws Exception {
                    if (!shouldAutoReconnect) {
                        Log.i(LOG_TAG, "Auto reconnect has been disabled,"
                                       "maybe kicked?");
                    }
                    return shouldAutoReconnect;
                }
            })
            .map(new Function<Long, Boolean>() {
                @Override
                public Boolean apply(Long aLong) throws Exception {
                    return checkSocketAvailable();
                }
            })
            .subscribeOn(Schedulers.computation())
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    Log.i(LOG_TAG, "Self check task has been scheduled per " 
                                    + 10 + " seconds.");
                    shouldAutoReconnect = true;
                    Log.i(LOG_TAG, "Auto reconnect feature has been enabled.");
                }
            })
            .subscribe(new Consumer<Boolean>() {
                           @Override
                           public void accept(Boolean webSocketAlive)
                                             throws Exception {
                               if (webSocketAlive) {
                                   Log.v(LOG_TAG, "WebSocket self check: is alive.");
                                   return;
                               }
                               // 自检服务器打开
                               initSocketWrapper("SelfCheckService");
                           }
                       },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.e(LOG_TAG, "Error while executing self check!" 
                                         + throwable);
                        }
                    });
}

这里我没使用lambda表达式,为了让方法清楚,所以代码比较长,但是结构比较清晰;

首先使用 interval 创建一个延时的周期被观察者,然后根据当前设置是否需要自动连接来过滤是否进行下面的操作,接下来判断当前的连接是否存在和连接,接下来判断是否存活,如果断连则调用初始化的方法,在上面解释了什么情况下会调用自检服务。

总结

整篇文章写了OkHttp的WebSocket使用,断连重试机制,Service使用等,主要的一些细节在文章中可能没体现出来,如果有需要则下载源码自己修改运行。 最后贴上Github地址,喜欢的话给个start! 🤗

项目的github地址

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong阅读 22,379评论 1 92
  • 1、Activity生命周期? onCreate() -> onStart() -> onResume() -> ...
    01_小小鱼_01阅读 2,816评论 0 44
  • 不知天上宫阙,只知地上扁鹊。 当李白看完扁鹊幼年的日记时,他瞬间明白,是什么让一个人有这么大变化。 扁鹊的师傅徐福...
    越人哥哥阅读 461评论 1 2
  • 渐渐变冷了,太阳给的光里温暖被打了折扣。听树叶沙沙作响,感觉它们脆的随时都会粉身碎骨。 午饭后去了超市,看到有点青...
    糖木小径阅读 274评论 0 0