前段时间由于业务需要,使用到socket与PC端建立链接并传输文件,本篇文章主要记录在使用过程中涉及到的问题,包括本地socket服务、连接、消息发送与接收、拆包、大小端转换、文件切片、心跳、重连等内容。
启动本地socket服务
如果需要移动端作为socket服务端,可以使用 dart:io 库中的ServerSocket,通过IP和端口进行链接
startServer() async {
try {
_serverSocket =
await ServerSocket.bind(InternetAddress.anyIPv4, serverPort);
_serverSocket.listen(serverOnReceive);
} catch (e, stackTrace) {
LoggerUtil.e(e);
LoggerUtil.e(stackTrace);
}
}
serverOnReceive(Socket socket) {
_socket = socket;
_socket?.listen(serverReceiveMsg);
}
serverReceiveMsg方法中收到消息进行拆包,这个下边会讲到。
Flutter连接其他Socket服务
这里我们需要使用到 dart:io 库中的socket.dart这个类中的Socket对象,使用.connect方法进行连接。
Socket.connect(address, port,
timeout: Duration(seconds: socketTimeout))
.then((socket) async {
_socket = socket;
_socket?.listen(
onReceivedMsg,
onError: onError,
onDone: onDone,
cancelOnError: false,
);
}).catchError((error) {
if (error is SocketException) {
LoggerUtil.e(error);
}
});
接收socket消息并处理粘包
粘包简单来说就是多个socket消息连在了一起,每个socket消息前四个字节代表了这条消息的长度,我们根据前四个字节的内容来读取相应的长度,如此循环来读取所有的Socket消息内容
//接收到socket消息
onReceivedMsg(event) async {
receiveList = receiveList + event;
//当接收到的数据长度大于8读取消息头
if (isPackReaded) {
while (receiveList.length > 4) {
isPackReaded = false;
int headerLength = 4;
//读取消息体长度
int msgLength = byteToNum(receiveList.sublist(0, 4));
//当收到的消息超过消息头描述的消息体长度时取出消息体并解码
if (receiveList.length >= headerLength + msgLength) {
List<int> bodyList =
receiveList.sublist(headerLength, headerLength + msgLength);
String bodyStr = utf8.decode(bodyList);
//这里处理已经读取的Socket消息内容 进行解base64或者解密
await analysisStr(bodyStr);
//读取后删除已读取的消息
receiveList = receiveList.sublist(headerLength + msgLength);
if (receiveList.isEmpty) {
isPackReaded = true;
}
} else {
isPackReaded = true;
break;
}
}
}
}
大小端转换
由于C中使用大端模式 所以要进行大小端的转换
//小端转大端
Uint8List int32BigEndianBytes(int value) {
return Uint8List(4)..buffer.asByteData().setInt32(0, value, Endian.big);
}
//大端转小端
int byteToNum(List<int> list) {
Uint8List resultList = Uint8List.fromList(list);
ByteData byteData = ByteData.view(resultList.buffer);
return byteData.getInt32(0);
}
消息发送
//发送socket消息
sendMsg(String msg) {
try {
Codec<String, String> stringToBase64 = utf8.fuse(base64);
String base64Str = stringToBase64.encode(msg);
//先告知消息长度-须转换为大端模式
_socket?.add(int32BigEndianBytes(base64Str.length));
//发送消息
_socket?.write(base64Str);
_socket?.flush();
} catch (e) {
debugPrint('========发送socket消息失败========$e');
}
}
消息类型
两端可自行约定socket消息类型
enum PackTypeEnum {
packTypeUnknow,
packTypeHeart,
packTypeDisConnect,
}
心跳与重连
心跳可由服务端发起每隔30秒发送一次心跳消息,客户端收到后进行回应并将本地心跳变量重置。
heartBeat = 0;
String sendStr = jsonEncode({
'packtype': PackTypeEnum.packTypeHeart.index,
'data': 'pang',
});
debugPrint('===========接收到心跳=====');
await sendMsg(sendStr);
本地定时任务进行检测,如果某一时间段内没有收到心跳,可能表示已经断开连接,尝试重新连接
startHeartBeat() {
_timer ??= Timer.periodic(const Duration(seconds: 1), (time) {
heartBeat++;
if (heartBeat > 40) {
heartBeat = 0;
//重连
if (socketAddress != null && socketPort != null) {
connectByAddress(socketAddress!, socketPort!);
}
}
});
}
文件切片传输
一些涉及业务的代码已经删除,核心就是拿到文件句柄,循环读取固定长度然后进行发送,所有片段发送完成后关闭
//切片传输文件
fileSlice(DocumentModel docModel, int docIndex, int docCount,
int uploadTotal) async {
debugPrint('=============发送文档===当前第-$fileIndex---共=$uploadTotal');
try {
for (DocumentFileModel docFile in docModel.fileList!) {
String docFilePath = docFile.localFilePath!;
File file = File(docFilePath);
var handle = await file.open();
var current = 0;
var size = file.lengthSync();
var chunkSize = 4096;
int chunkIndex = 0;
while (current < size) {
var len = size - current >= chunkSize ? chunkSize : size - current;
var section = handle.readSync(len);
current = current + len;
// 处理数据块
Map sendMap = {};
chunkIndex += 1;
String jsonStr = jsonEncode(sendMap);
Codec<String, String> stringToBase64 = utf8.fuse(base64);
String base64Str = stringToBase64.encode(jsonStr);
_socket?.add(int32BigEndianBytes(base64Str.length));
_socket?.write(base64Str);
// 立即发送并清空缓冲区
await _socket?.flush();
}
await handle.close();
fileIndex += 1;
EasyLoading.showProgress(
(fileIndex / uploadTotal) > 1 ? 1 : fileIndex / uploadTotal,
status: '已发送%s个'.trArgs(['$fileIndex/$uploadTotal']));
if (fileIndex >= uploadTotal) {
fileIndex = 0;
EasyLoading.showSuccess('成功'.tr);
}
}
} catch (e) {
debugPrint('================${e.toString()}');
}
}