上篇已经准备好了基本的条件,接下来就是如何与服务器之间建立
一条长连接,以及如何封包
、解包
;
新建LXSocketManager
类,用于对CocoaAsyncSocket
进行封装,这样以后如果更换另外的socket库,只需要修改该文件即可。pod下来我们发现CocoaAsyncSocket
有两个文件GCDAsyncSocket.h
、GCDAsyncUdpSocket.h
,前者基于TCP
而后者基于UDP
,这里选用前者。
// LXSocketManager.h
typedef NS_ENUM(NSInteger, LXSocketStatus) {
LXSocketStatusUnknown = -1,
LXSocketStatusUnconnect,
LXSocketStatusConnect,
};
@class LXSocketManager;
@protocol LXSocketManagerDelegate <NSObject>
@optional
- (void)socketWillSendHeartBeat;
- (void)socket:(LXSocketManager *)socket didConnect:(NSString *)server;
- (void)socket:(LXSocketManager *)socket didReceive:(Message *)message;
@end
@interface LXSocketManager : NSObject
@property (nonatomic, assign, readonly) LXSocketStatus connectStatus;
@property (nonatomic, weak) id<LXSocketManagerDelegate> delegate;
// 连接
- (void)connectTo:(NSString *)host onPort:(uint16_t)port;
// 断开连接
- (void)disconnect;
// 重连
- (void)forceReconnect;
// 发送数据
- (void)sendData:(NSData *)data;
// 开始发送心跳
- (void)startHeartBeat;
@end
点开GCDAsyncSocket.h
文件,可以看到以下方法
// 初始化
- (instancetype)init;
- (instancetype)initWithSocketQueue:(nullable dispatch_queue_t)sq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq socketQueue:(nullable dispatch_queue_t)sq;
// 连接
- (BOOL)connectToHost:(NSString *)host onPort:(uint16_t)port error:(NSError **)errPtr;
- (BOOL)connectToHost:(NSString *)host onPort:(uint16_t)port withTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr;
// 断开连接
- (void)disconnect;
首先创建客户端socket
socket = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:dispatch_get_main_queue()];
接下来连接服务端的socket
[socket connectToHost:host onPort:port error:&error];
怎么知道是否连接成功,如何接收数据,socket中断等消息,查看GCDAsyncSocketDelegate
代理,会看到以下方法
// 已连接
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(uint16_t)port
// 断开连接
- (void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(NSError *)err
// 接收数据
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
// LXSocketManager.m
#pragma mark - GCDAsyncSocketDelegate
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(uint16_t)port {
LXLog(@"==============socket did connect host: %@, port: %hu==============", host, port);
//
[self pullMesasge];
//
_connectStatus = LXSocketStatusConnect;
if ([self.delegate respondsToSelector:@selector(socket:didConnect:)]) {
NSString *server = [NSString stringWithFormat:@"%@:%d", host, port];
[self.delegate socket:self didConnect:server]; // 开启心跳等操作
}
}
- (void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(NSError *)err {
LXLog(@"==============socket did disconnect==============");
// 停止心跳
[self stopHeartBeat];
_connectStatus = LXSocketStatusUnconnect;
// 重连
[self reconnectIfNeed];
}
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag {
[receiveData appendData:data];
// 读取包内容长度
int32_t headLength = 0;
int32_t contentLength = [self getContentLength:receiveData withHeadLength:&headLength];
if (contentLength <= 0) {
[self pullMesasge];
return;
}
// 还未接收到一个完整的数据
if (headLength + contentLength > [receiveData length]) {
// 继续接收下一条消息
[self pullMesasge];
return;
}
// 解析
[self parseContentDataWithHeadLength:headLength withContentLength:contentLength];
[self pullMesasge];
}
- (void)pullMesasge {
[socket readDataWithTimeout:-1 tag:110];
}
心跳
客户端每隔一段时间发送一个数据包给服务端告知服务端我还活着,这就是心跳。心跳的数据需要与服务端约定;当服务端在一定时间内没有收到心跳包,就会断开连接,客户端会收到断开连接的回调,然后进入重连机制。
重连机制
当断开连接后,每过一段时间T
重连。在这里时间采用的是指数增长的,并且最大次数是4次。
封包
先将Message.proto
文件编译成objc文件,然后直接调用对象delimitedData
方法,接着就可以用socket
发送我们的数据包了
NSData *data = [message delimitedData];
解包
当我们读取数据的时候,正常
的情况是收到一个个完整的数据包,然后再反序列化成我们的ProtoBuf对象
,但有时候会出现粘包
、断包
的情况。如何处理?一个数据包
由包头
和包体
组成,包头
有这个数据包的长度信息
,因此先获取该数据包的长度
,然后根据长度去截取
即可。具体代码如下。
/** 关键代码:获取data数据的内容长度和头部长度: index --> 头部占用长度 (头部占用长度1-4个字节) */
- (int32_t)getContentLength:(NSData *)data withHeadLength:(int32_t *)index {
int8_t tmp = [self readRawByte:data headIndex:index];
if (tmp >= 0) return tmp;
int32_t result = tmp & 0x7f;
if ((tmp = [self readRawByte:data headIndex:index]) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 0x7f) << 7;
if ((tmp = [self readRawByte:data headIndex:index]) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 0x7f) << 14;
if ((tmp = [self readRawByte:data headIndex:index]) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 0x7f) << 21;
result |= (tmp = [self readRawByte:data headIndex:index]) << 28;
if (tmp < 0) {
for (int i = 0; i < 5; i++) {
if ([self readRawByte:data headIndex:index] >= 0) {
return result;
}
}
result = -1;
}
}
}
}
return result;
}
/** 读取字节 */
- (int8_t)readRawByte:(NSData *)data headIndex:(int32_t *)index{
if (*index >= data.length) return -1;
*index = *index + 1;
return ((int8_t *)data.bytes)[*index - 1];
}
/** 解析二进制数据:NSData --> 自定义模型对象 */
- (void)parseContentDataWithHeadLength:(int32_t)headL withContentLength:(int32_t)contentL{
NSRange range = NSMakeRange(0, headL + contentL); //本次解析data的范围
NSData *data = [receiveData subdataWithRange:range]; //本次解析的data
GPBCodedInputStream *inputStream = [GPBCodedInputStream streamWithData:data];
NSError *error;
Message *obj = [Message parseDelimitedFromCodedInputStream:inputStream extensionRegistry:nil error:&error];
if (!error){
if (obj) {
//保存解析正确的模型对象
if ([self.delegate respondsToSelector:@selector(socket:didReceive:)]) {
[self.delegate socket:self didReceive:obj];
}
}
[receiveData replaceBytesInRange:range withBytes:NULL length:0]; //移除已经解析过的data
}
if (receiveData.length < 1) return;
//对于粘包情况下被合并的多条消息,循环递归直至解析完所有消息
headL = 0;
contentL = [self getContentLength:receiveData withHeadLength:&headL];
if (headL + contentL > receiveData.length) return; //实际包不足解析,继续接收下一个包
[self parseContentDataWithHeadLength:headL withContentLength:contentL]; //继续解析下一条
}
监控网络状态
因为是移动设备,网络状态的改变是非常频繁的;所以需要监控网络状态来做出相应的操作。这里选择的是RealReachability
第三方库
// 开启监听
[GLobalRealReachability startNotifier];
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(networkChange:) name:kRealReachabilityChangedNotification object:nil];
#pragma mark - network reachability
//
- (void)networkChange:(NSNotification *)notif {
RealReachability *reachability = (RealReachability *)notif.object;
ReachabilityStatus status = [reachability currentReachabilityStatus];
switch (status) {
case RealStatusNotReachable:
case RealStatusUnknown: {
LXLog(@"network unknown or no reachable");
if (self.socket.connectStatus == LXSocketStatusConnect) {
[self.socket disconnect];
}
break;
}
case RealStatusViaWiFi:
case RealStatusViaWWAN: {
LXLog(@"wifi or wwan");
if (self.socket.connectStatus != LXSocketStatusConnect) {
// 重连
[self.socket forceReconnect];
}
break;
}
}
}
参考文章
1、ProtoBuf
粘包、断包处理 https://www.cnblogs.com/tandaxia/archive/2017/04/16/6718695.html