WebSocket 在开发中遇到的情况很少,导致在使用的时候可能遇到很多的问题,比如它的重连机制、发送数据的统一、结合Service使用;下面的文章将使用OkHttp中的WebSocket以及Rxjava2结合Service实现断开重连,发送和接收服务器的数据。
WebSocket
我们使用WebSocket的库有很多,类似 AndroidAsync,作者对它的介绍是一个更加底层的异步网络库,使用这个库也非常简单,传一个地址和协议再加一个回调接口,就能简单的使用:
AsyncHttpClient.getDefaultInstance()
.websocket(get, "my-protocol", new WebSocketConnectCallback() {
@Override
public void onCompleted(Exception ex, WebSocket webSocket) {
if (ex != null) {
ex.printStackTrace();
return;
}
webSocket.send("a string");
webSocket.send(new byte[10]);
webSocket.setStringCallback(new StringCallback() {
public void onStringAvailable(String s) {
System.out.println("I got a string: " + s);
}
});
webSocket.setDataCallback(new DataCallback() {
public void onDataAvailable(DataEmitter emitter,
ByteBufferList byteBufferList) {
System.out.println("I got some bytes!");
// note that this data has been read
byteBufferList.recycle();
}
});
}
});
但是,我们看到这个库却是有非常多的类,通常的,我们的网络请求库都只有一个,现在大部分Android开发都会使用类似OkHttp或者Volley来请求网络,如果再加一个库的话,无疑增加了应用的代码量,这里我们使用OkHttp中内置的WebSocket,基于它的一个封装,我们也能写一个轻量级的WebSocket应用; 而OkHttp中的WebSocket使用也是封装方便:
// 创建一个Request
Request request = new Request.Builder()
.url(socketUrl)
.build();
OkHttpClient client = new OkHttpClient();
// 使用OkHttpClient 来创建一个WebSocket
client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t,
@javax.annotation.Nullable Response response) {
super.onFailure(webSocket, t, response);
}
});
client.dispatcher().executorService().shutdown();
可以看到,先创建一个Request,我们在调用接口的时候,也是创建一个Request对象,然后使用OkHttpClient来创建一个WebSocket,回调到WebSocketListener监听接口之后,再做逻辑业务处理。(接口方法意思都很简单,使用的时候再说明)
Rxjava2 使用
前面的博客都简单的介绍了Rxjava2的使用,最近在项目中使用的也比较频繁,后面再写一些项目中Rxjava2的实战:
Rxjava2 学习创建型操作符
Rxjava2 学习变换操作符
Rxjava2 学习过滤操作符
项目开发:
既然是使用到逻辑业务上的操作,和界面无关,自然会想到Service; 将其直接封装成Service,在使用的时候直接bindService或者startService会方便很多,不知不觉中将逻辑业务和页面分隔开; 如果我们直接在页面中使用也未尝不可,但是这样就不便复用了。
所以开始我们先创建一个Service:
public WebSocketService extends Service {
public static final String LOG_TAG = "WebSocketTest";
@Nullable
@Override
public IBinder onBind(Intent intent) {
Log.v(LOG_TAG, "----- onBind -----");
return new ServiceBinder();
}
public class ServiceBinder extends Binder {
public WebSocketService getService() {
return WebSocketService.this;
}
}
@Override
public void onCreate() {
super.onCreate();
Log.i(LOG_TAG, "----- onCreate -----");
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
//Use this to force restart service
return START_STICKY;
}
@Override
public void onDestroy() {
super.onDestroy();
Log.i(LOG_TAG, "----- onDestroy -----");
}
}
Service常用的步骤,使用其中的方法,这里我们使用bindService回调一个ServiceConnect接口,因为我们需要使用到这个WebSocketService实例;接下来,我们就需要在onCreate方法中做一些初始化的操作:
/**
* 初始化
*
* @param startReason
* @param isFirstConnect
*/
private void initSocketWrapper(String startReason, boolean isFirstConnect) {
// 拿到Reason,打印log
Observable.just(startReason)
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
// 判断当前是否正在连接
if (isAttemptConnecting) {
Log.v(LOG_TAG, startReason + " : Should reconnect but"+
"already in process, skip.");
return Boolean.FALSE;
}
return Boolean.TRUE;
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
if ((mWebSocket == null) // 如果已经为空
&& (!isFirstConnect) // 不是第一次连接
&& (!isAttemptConnecting)) { // 当前没有在尝试连接
showUiWebSocketStatus("与服务器失去连接!!!");
}
}
})
.observeOn(Schedulers.io())
.subscribe(s -> initSocket());
}
上面写清楚了注释,这个方法主要是初始化,拿到当前的原因和是否是第一次连接,还有一个全局变量 isAttemptConnecting 来判断当前WebSocket是否在连接中,然后在这之间先判断 如果websocket为空并且不是第一次连接,而且还没有尝试连接,则toast提示用户断开连接!!!
接下来就是使用OkHttp中WebSocket创建连接了:
/**
* 初始化WebSocket
*/
private void initSocket() {
// ... 省略一些状态切换代码
...
// 开始初始化
Observable.create(new ObservableOnSubscribe<WebSocket>() {
@Override
public void subscribe(ObservableEmitter<WebSocket> emitter)
throws Exception {
//TODO 这里可以进行登录业务判断
Request request = new Request.Builder()
.url(socketUrl)
.build();
OkHttpClient client = new OkHttpClient();
client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
isAttemptConnecting = false;
connectionAttemptCount = 0;
// 连接成功之后
mWebSocket = webSocket;
dispatchStringMessage("连接成功!!!");
emitter.onNext(mWebSocket);
emitter.onComplete();
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
dispatchStringMessage(text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
Log.i(LOG_TAG, "ClosedCallback: WebSocket closed.");
// 等待自检重启,或者自然关闭
if ((!preparedShutdown) && (shouldAutoReconnect)) {
initSocketWrapper("onClose");
}
}
@Override
public void onFailure(WebSocket webSocket, Throwable t,
@Nullable Response response) {
super.onFailure(webSocket, t, response);
dispatchStringMessage("连接失败!!!");
emitter.onError(t != null ? t
: new ConnectException("Cannot connect we service!!!"));
}
});
client.dispatcher().executorService().shutdown();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<WebSocket>() {
@Override
public void accept(WebSocket webSocket) throws Exception {
if (pongService == null) {
startPongDaemonService();
}
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
// 判断是否需要执行诊断服务
if (connectionAttemptCount >= ATTEMPT_TOLERANCE) {
// 强制开始诊断服务
startService(new Intent(WebSocketService.this,
NetworkDiagnosisService.class));
// 重置标记
connectionAttemptCount = 0;
}
}
}
使用create操作符创建一个被观察者对象发射器,在其中使用OkHttp的创建WebSocket方式创建WebSocket,然后根据连接的结果进行emitter发射 onNext()、onError()、onComplete() 方法,连接成功之后,开始发送 自检服务 Pong; 连接失败之后,开始检测网络是否有连接:NetWorkDiagnosisService 网络诊断服务。
发送Pong守护进程,先创建一个单线程线程池,然后发送消息:
/**
* 给服务器发送Pong自检
*/
private void startPongDaemonService() {
pongService = Executors.newSingleThreadScheduledExecutor();
pongService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
if (mWebSocket != null) {
sendRequest(WsObjectPool.newPongRequest());
}
}
}, 10, 10, TimeUnit.SECONDS);
Log.i(LOG_TAG, "Pong service has been scheduled at " + 10 + " seconds delay.");
}
延迟十秒执行,使用一个WebSocketObjectPool对象池,为了方便取出对象数据。
整个WebSocket初始化就在上面,流程还是比较简单的,接下来就是从服务器拿到数据之后分发数据了, 因为我这里测试只用到String数据,如果需要用到Json的数据,则分发json数据即可:
/**
* 方法字符串
*
* @param message
*/
private void dispatchStringMessage(String message) {
Observable.just(message)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
WsListener<String> listener =
(WsListener<String>)activeListener.get(
SocketConstants.ResponseType.RESPONSE_STRING_MESSAGE);
Log.d(LOG_TAG, "Msg entity: " + s + ".");
if (listener != null) {
listener.handleData(s);
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
拿到message String数据之后,或者到外部监听器,因为是使用到String字符串类型,所以这里直接获取到一个监听器,然后分发给方法。
如果是方法json的话,就去GitHub下载代码查看详细代码:代码
自检服务
连接成功之后,我们需要自检,因为可能在连接过程中出现断连的情况,网络不稳定情况,所以需要使用循环的自检:
/**
* 启动自检服务,按照周期执行
*/
private void startSelfCheckService() {
// 自检服务器打开
mSelfCheckDispose = Observable
.interval(10, 10, TimeUnit.SECONDS)
.filter(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
if (!shouldAutoReconnect) {
Log.i(LOG_TAG, "Auto reconnect has been disabled,"
"maybe kicked?");
}
return shouldAutoReconnect;
}
})
.map(new Function<Long, Boolean>() {
@Override
public Boolean apply(Long aLong) throws Exception {
return checkSocketAvailable();
}
})
.subscribeOn(Schedulers.computation())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.i(LOG_TAG, "Self check task has been scheduled per "
+ 10 + " seconds.");
shouldAutoReconnect = true;
Log.i(LOG_TAG, "Auto reconnect feature has been enabled.");
}
})
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean webSocketAlive)
throws Exception {
if (webSocketAlive) {
Log.v(LOG_TAG, "WebSocket self check: is alive.");
return;
}
// 自检服务器打开
initSocketWrapper("SelfCheckService");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(LOG_TAG, "Error while executing self check!"
+ throwable);
}
});
}
这里我没使用lambda表达式,为了让方法清楚,所以代码比较长,但是结构比较清晰;
首先使用 interval 创建一个延时的周期被观察者,然后根据当前设置是否需要自动连接来过滤是否进行下面的操作,接下来判断当前的连接是否存在和连接,接下来判断是否存活,如果断连则调用初始化的方法,在上面解释了什么情况下会调用自检服务。
总结
整篇文章写了OkHttp的WebSocket使用,断连重试机制,Service使用等,主要的一些细节在文章中可能没体现出来,如果有需要则下载源码自己修改运行。 最后贴上Github地址,喜欢的话给个start! 🤗