使用实例,做备忘。
1、使用时调用
Connect()
,传入MQTT的地址path
、客户端标识clientId
、用户名userName
、用户密码pwd
。
2、连接断开后,会执行一些操作,需要根据需要完善。
/*
* Mqtt 连接相关的方法
* @Author: Sen
* @Date : 2023-02-14 09:21:55
*/
package common
import (
"encoding/json"
"errors"
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"runtime"
"time"
)
// Connect 连接方法
// @author: Sen
func Connect(path, clientId, userName, pwd string) (err error) {
opts := mqtt.NewClientOptions().AddBroker(path)
opts.SetClientID(clientId) // 客户端标识。不能重复
opts.SetUsername(userName) // 用户名
opts.SetPassword(pwd) // 密码
opts.SetKeepAlive(60 * time.Second) // 心跳 60s
opts.SetDefaultPublishHandler(MessagePubHandler) // publish消息处理
opts.SetAutoReconnect(true) // 自动重连
opts.OnConnect = ConnectHandler // 连接后处理
opts.OnConnectionLost = ConnectLostHandler // 断连后处理
opts.CleanSession = false // 不清除session,保证重新连接后可继续执行
// 连接平台
Client := mqtt.NewClient(opts)
token := Client.Connect()
if token.WaitTimeout(5*time.Second) == false {
return errors.New("error time out")
}
if token.Error() != nil {
return token.Error()
}
log.Println("Connection Success")
// 订阅-预处理
topic := "" // 订阅的频道
if token := Client.Subscribe(topic, 1, messageSubHandler); token.Wait() && token.Error() != nil {
return token.Error()
}
return
}
// ConnectHandler 连接成功时回调函数
// @author: Sen
var ConnectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
log.Println("MQTT Connection Suc")
ConOrLostFunc("con")
}
// ConnectLostHandler 丢失连接的回调函数
// @author: Sen
var ConnectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Printf("MQTT Connection Lost: %v", err.Error())
lostTime = time.Now().Unix()
ConOrLostFunc("lost")
}
// MessagePubHandler
// @author: Sen
var MessagePubHandler mqtt.MessageHandler = func(client mqtt.Client, message mqtt.Message) {
log.Printf("TOPIC: %s \nMSG:%s \n", message.Topic(), message.Payload())
}
// messageSubHandler 创建全局mqtt sub消息处理
// @author: Sen
var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, message mqtt.Message) {
log.Println("收到订阅消息,TOPIC and MSG:")
log.Println(message.Topic()) // topic
log.Println(Bytes2String(message.Payload())) // message
// 去分发器执行
DistributionService(message.Payload())
}
// DistributionService 分发器
// @author: Sen
func DistributionService(message []byte) {
// 自定义的数据格式,按需修改
var params map[string]interface{}
err := json.Unmarshal(message, ¶ms)
if err != nil {
log.Println("解析MQTT参数错误", err.Error())
return
}
log.Println("开始处理IOT服务:")
// 自定义的处理方式,按需修改
switch params["method"] {
case "key1":
log.Println("key1 do some ting ……")
case "key2":
log.Println("key2 do some ting ……")
case "key3":
log.Println("key3 do some ting ……")
case "key4":
log.Println("key4 do some ting ……")
default:
log.Println("未定义IOT事件,不处理~")
}
return
}
var (
lostTime int64 //丢失连接的时间点
lostDone chan int //丢失时的chan
)
// ConOrLostFunc 连接or丢失连接时执行
// @author: Sen
func ConOrLostFunc(flag string) {
// flag :丢失连接时开启监听服务
// 重新连接时,关闭监听服务
if flag == "con" { //重新连接
if lostTime == 0 {
lostTime = 0
return
}
lostDone <- 1
} else { //丢失连接
lostDone = make(chan int)
go connectLostFunc(lostDone)
}
}
// connectLostFunc 连接丢失时执行
// @author: Sen
func connectLostFunc(ch chan int) {
for {
select {
case <-ch:
runtime.Goexit()
break
default:
}
log.Println("Iot Connect Lost And Do Some Thing")
time.Sleep(2 * time.Second)
}
}