开篇
最近在使用MQTTClient实现一个类似于消息推送的服务,说实话,真没怎么使用过MQTTClient,也不知道这是个啥? 好尴尬 😓 😓 😓 上网了解了一下,发现MQTT功能挺强(牛)大(逼)。这里我使用的是消息推送服务,通过和服务器端协商,终于能够与服务器连接,并且能够收发消息了。 所以,简单总结了一下,有了这篇文章。
MQTT介绍
- MQTT
MQTT基于订阅者模型架构,客户端如果互相通信,必须在同一订阅主题下,即都订阅了同一个topic,客户端之间是没办法直接通讯的。订阅模型显而易见的好处是群发消息的话只需要发布到topic,所有订阅了这个topic的客户端就可以接收到消息了。
发送消息必须发送到某个topic,重点说明的是不管客户端是否订阅了该topic都可以向topic发送了消息,还有如果客户端订阅了该主题,那么自己发送的消息也会接收到。
- MQTT特点
使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。这一点很类似于XMPP,但是MQTT的信息冗余远小于XMPP.
对负载内容屏蔽的消息传输。
使用TCP/IP提供网络连接。主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。-
三种消息传输方式QoS:
- 0代表“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- 1代表“至少一次”,确保消息到达,但消息重复可能会发生。
- 2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。 (备注:由于服务端采用Mosca实现,Mosca目前只支持到QoS 1)
如果发送的是临时的消息,例如给某topic所有在线的设备发送一条消息,丢失的话也无所谓,0就可以了(客户端登录的时候要指明支持的QoS级别,同时发送消息的时候也要指明这条消息支持的QoS级别),如果需要客户端保证能接收消息,需要指定QoS为1,如果同时需要加入客户端不在线也要能接收到消息,那么客户端登录的时候要指定session的有效性,接收离线消息需要指定服务端要保留客户端的session状态。
具体MQTT的详细介绍可以戳这里 https://baike.baidu.com/item/MQTT/3618851?fr=aladdin
MQTTClient的使用
iOS 环境下开发 MQTT 客户端程序,一般依赖稳定的第三方 FrameWork,由于涉及网络数据传输,建议选择 Object-c
原生的框架,比如 MQTT-Client-Framework
。
现在一般常用的有两个MQTT
1) MQTTKit
2) MQTTClient
不过MQTTKit貌似很长时间不维护了, 使用较多的是MQTTClient。
- 集成MQTTClient
MQTT-Client-Framework- 用cocopod直接, pod 'MQTTClient'
- GitHub下载,把相对应的文件夹拖进工程即可
MQTT-Client-FrameWork
包提供的客户端类有 MQTTSession
和 MQTTSessionManager
,建议使用后者维持静态资源,而且已经封装好自动重连等逻辑。初始化时需要传入相关的网络参数。
我使用的是第二种, 引入 #import "MQTTClient.h"
#import "MQTTSessionManager.h"
头文件, 遵循 MQTTSessionManagerDelegate
协议
使用步骤:
- 建立连接
和服务器端确定好MQTT服务器地址,端口号, 用户名, 密码, 订阅主题topic
/**
host: 服务器地址
port: 服务器端口
tls: 是否使用tls协议,mosca是支持tls的,如果使用了要设置成true
keepalive: 心跳时间,单位秒,每隔固定时间发送心跳包, 心跳间隔不得大于120s
clean: session是否清除,这个需要注意,如果是false,代表保持登录,如果客户端离线了再次登录就可以接收到离线消息
auth: 是否使用登录验证
user: 用户名
pass: 密码
willTopic: 订阅主题
willMsg: 自定义的离线消息
willQos: 接收离线消息的级别
clientId: 客户端id,需要特别指出的是这个id需要全局唯一,因为服务端是根据这个来区分不同的客户端的,默认情况下一个id登录后,假如有另外的连接以这个id登录,上一个连接会被踢下线, 我使用的设备UUID
*/
NSString *clientId = [UIDevice currentDevice].identifierForVendor.UUIDString;
MQTTSessionManager *sessionManager = [[MQTTSessionManager alloc] init];
[sessionManager connectTo:@"121.199.19.126"
port:1883
tls:false
keepalive:60 //心跳间隔不得大于120s
clean:true
auth:true
user:@"guest"
pass:@"guest"
will:false
willTopic:nil
willMsg:nil
willQos:0
willRetainFlag:false
withClientId:clientId];
sessionManager.delegate = self;
self.sessionManager = sessionManager;
- 监控连接状态
连接当前状态,添加对应的回调接口,可以进行相关的业务逻辑处理。
// 添加监听状态观察者
[self.sessionManager addObserver:self
forKeyPath:@"state"
options:NSKeyValueObservingOptionInitial | NSKeyValueObservingOptionNew
context:nil];
监听连接状态,进行相应处理。
// 监听当前连接状态
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
switch (self.sessionManager.state) {
case MQTTSessionManagerStateClosed:
NSLog(@"连接已经关闭");
break;
case MQTTSessionManagerStateClosing:
NSLog(@"连接正在关闭");
break;
case MQTTSessionManagerStateConnected:
NSLog(@"已经连接");
break;
case MQTTSessionManagerStateConnecting:
NSLog(@"正在连接中");
break;
case MQTTSessionManagerStateError: {
NSString *errorCode = self.sessionManager.lastErrorCode.localizedDescription;
NSLog(@"连接异常 ----- %@",errorCode);
}
break;
case MQTTSessionManagerStateStarting:
NSLog(@"开始连接");
break;
default:
break;
}
}
- 接收消息
实现MQTTSessionManagerDelegate
代理方法,处理数据。
// 获取服务器返回数据
- (void)handleMessage:(NSData *)data onTopic:(NSString *)topic retained:(BOOL)retained {
NSLog(@"------------->>%@",topic);
NSString *dataString = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding];
NSLog(@"%@",dataString);
// 进行消息处理
}
- 订阅和发送消息
// 订阅主题 NSDictionary类型,Object 为 QoS,key 为 Topic
self.sessionManager.subscriptions = [NSDictionary dictionaryWithObject:[NSNumber numberWithInt:MQTTQosLevelExactlyOnce] forKey:@"hello"];
// 发送消息 返回值msgid大于0代表发送成功
NSString *msg = @"hahaha";
UInt16 msgid = [self.sessionManager sendData:[msg dataUsingEncoding:NSUTF8StringEncoding] //要发送的消息体
topic:@"hello" //要往哪个topic发送消息
qos:0 //消息级别
retain:false];
如果是使用阿里云的服务器替代自己的服务器,需要在阿里云控制台申请 Topic
,Group ID
等资源。
在建立连接时,传入的参数值也会有所改变, user
和 pass
由于服务端需要对客户端进行鉴权,因此需要传入合法的 user
和 pass
。 user
设置为当前用户的 AccessKey
,pass
则设置为 MQTT 客户端 GroupID
的签名字符串,签名计算方式是使用 SecretKey
对 GroupID
做 HmacSHA1
散列加密。
self.manager = [[MQTTSessionManager alloc] init];
self.manager.delegate = self;
self.manager.subscriptions = [NSDictionary dictionaryWithObject:[NSNumber numberWithInt:self.qos]
forKey:[NSString stringWithFormat:@"%@/#", self.rootTopic]];
//password的计算方式是,使用secretkey对groupId做hmac签名算法,具体实现参考macSignWithText方法
NSString *passWord = [[self class] macSignWithText:self.groupId secretKey:self.secretKey];
[self.manager connectTo:self.mqttSettings[@"host"]
port:[self.mqttSettings[@"port"] intValue]
tls:[self.mqttSettings[@"tls"] boolValue]
keepalive:60 //心跳间隔不得大于120s
clean:true
auth:true
user:self.accessKey
pass:passWord
will:false
willTopic:nil
willMsg:nil
willQos:0
willRetainFlag:false
withClientId:self.clientId];
使用 SecretKey
对 GroupID
做 HmacSHA1
散列加密
+ (NSString *)macSignWithText:(NSString *)text secretKey:(NSString *)secretKey {
NSData *saltData = [secretKey dataUsingEncoding:NSUTF8StringEncoding];
NSData *paramData = [text dataUsingEncoding:NSUTF8StringEncoding];
NSMutableData* hash = [NSMutableData dataWithLength:CC_SHA1_DIGEST_LENGTH ];
CCHmac(kCCHmacAlgSHA1, saltData.bytes, saltData.length, paramData.bytes, paramData.length, hash.mutableBytes);
NSString *base64Hash = [hash base64EncodedStringWithOptions:0];
return base64Hash;
}
具体介绍请戳下面帮助链接
MQTT接入环境配置
阿里云接入MTQQ示例
申请MQ资源