python实现MQTT发布和订阅

一、安装paho-mqtt

通过命令安装

pip install paho-mqtt

二、实现MQTT发布

from paho.mqtt import client as mqtt
import time

def on_connect(client, userdata, flags, rc):
    """一旦连接成功, 回调此方法"""
    rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
    print("connect:", rc_status[rc])

def mqtt_connect():
    """连接MQTT服务器"""
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    mqttClient = mqtt.Client(client_id)
    mqttClient.on_connect = on_connect  # 返回连接状态的回调函数
    MQTTHOST = "IP"  # MQTT服务器地址
    MQTTPORT = 1883  # MQTT端口
    mqttClient.username_pw_set("username", "password")  # mqtt服务器账号密码
    mqttClient.connect(MQTTHOST, MQTTPORT, 60)
    mqttClient.loop_start()  # 启用线程连接
    return mqttClient

# 发布消息
def mqtt_publish():
    """发布主题为'mqtt/demo',内容为'Demo text',服务质量为2"""
    mqttClient = mqtt_connect()
    text = "Demo text"
    mqttClient.publish('mqtt/demo', text, 2)
    mqttClient.loop_stop()

if __name__ == '__main__':
    mqtt_publish()

三、实现MQTT订阅

1.订阅消息-方法1

from paho.mqtt import client as mqtt
import time

def on_connect(client, userdata, flags, rc):
    """一旦连接成功, 回调此方法"""
    rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
    print("connect:", rc_status[rc])

def on_message(client, userdata, msg):
    """一旦订阅到消息, 回调此方法"""
    print("主题:"+msg.topic+" 消息:"+str(msg.payload.decode('gb2312')))

def mqtt_connect():
    """连接MQTT服务器"""
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    mqttClient = mqtt.Client(client_id)
    mqttClient.on_connect = on_connect  # 返回连接状态的回调函数
    mqttClient.on_message = on_message  # 返回订阅消息回调函数
    MQTTHOST = "IP"  # MQTT服务器地址
    MQTTPORT = 1883  # MQTT端口
    mqttClient.username_pw_set("username", "password")  # mqtt服务器账号密码
    mqttClient.connect(MQTTHOST, MQTTPORT, 60)
    mqttClient.loop_start()  # 启用线程连接
    return mqttClient

def on_subscribe():
    """订阅主题:mqtt/demo"""
    mqttClient = mqtt_connect()
    mqttClient.subscribe("mqtt/demo", 2)
    while True:
        pass

if __name__ == '__main__':
    on_subscribe()

2.订阅消息-方法2

from paho.mqtt import client as mqtt
import time

def on_connect(client, userdata, flags, rc):
    """一旦连接成功, 回调此方法"""
    rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
    print("connect:", rc_status[rc])
    client.subscribe("mqtt/demo", 2)  # 订阅消息

def on_message(client, userdata, msg):
    """一旦订阅到消息, 回调此方法"""
    print("主题:"+msg.topic+" 消息:"+str(msg.payload.decode('gb2312')))  # 客户端返回的消息,使用gb2312编码中文不会报错

def server_conenet(client):
    client.on_connect = on_connect  # 返回连接状态的回调函数
    client.on_message = on_message  # 返回订阅消息的回调函数
    client.username_pw_set("username", "password")  # mqtt服务器账号密码
    client.connect("IP", 1883, 60)  # 连接
    client.loop_forever()   # 以forever方式阻塞运行。

def server_main():
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    client = mqtt.Client(client_id, transport='tcp')
    server_conenet(client)

if __name__ == '__main__':
    # 启动监听
    server_main()

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

推荐阅读更多精彩内容