为什么集成MQTT协议
- 本人从事电力相关的软件开发,因项目需求需要从电科院等拥有变电、配电传感器的公司获取电流、电压等电力相关数据。所以准备集成MQTT协议订阅其服务器的主题进行数据的集成分析。
什么是MQTT协议
- image.png
本地启动MQTT生产者
想要消费信息就要有人生产,本人采用EMQ框架模拟生产者。
下载完成后 进入bin目录下 执行 .\emqx start EMQ服务,退出为 .\emqx stop
-
执行命令后 控制台无其他异常出现视为启动成功,可访问 http://localhost:18083/ 验证。
image.png -
账号admin 密码 public 登陆后 可在工具中执行连接 订阅 发布等行为 image.png
如何整合MQTT消费者
- 此工程采用Springboot 2.2.6 版本构建,工程为单体工程。工程构建工具采用Gradle
2.引入需要的依赖
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2'
implementation 'org.fusesource.mqtt-client:mqtt-client:1.14'
- 在application.yml 添加相关配置
spring:
mqtt:
username: admin
password: public
url: tcp://127.0.0.1:1883
topic: /test,/test2,/test3
completionTimeout: 3000
client:
id: mqttId
- 创建配置类,读取配置的信息
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class MqttCofigBean {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.topic}")
private String msgTopic;
@Value("${spring.mqtt.completionTimeout}")
private int completionTimeout; //连接超时
/**
* 获取用户名
*
* @return
*/
public String getUsername() {
return this.username;
}
/**
* 获取密码
*
* @return
*/
public String getPassword() {
return this.password;
}
/**
* 获取服务器连接地址
*
* @return
*/
public String getHostUrl() {
return this.hostUrl;
}
/**
* 获取客户端ID
*
* @return
*/
public String getClientId() {
return this.clientId;
}
/**
* 获取默认主题
*
* @return
*/
public String[] getMsgTopic() {
String[] topic = msgTopic.split(",");
return topic;
}
/***
* 获取连接超时时间
* @return
*/
public int getCompletionTimeout() {
return this.completionTimeout;
}
}
- 创建消费者类
import org.springframework.boot.ApplicationRunner;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.stereotype.Component;
@Component
public class MqttConsumer implements ApplicationRunner {
private static Logger logger = LoggerFactory.getLogger(MqttConsumer.class);
private static MqttClient client;
private static MqttTopic mqttTopic;
/**
* MQTT连接属性配置对象
*/
@Autowired
public MqttCofigBean mqttCofigBean;
/**
* 初始化参数配置
*/
@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("INIT MQTT CONNECT");
this.connect();
}
/**
* 用来连接服务器
*/
private void connect() throws Exception {
client = new MqttClient(mqttCofigBean.getHostUrl(), mqttCofigBean.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(mqttCofigBean.getUsername());
options.setPassword(mqttCofigBean.getPassword().toCharArray());
//是否清除session
options.setCleanSession(false);
// 设置超时时间
options.setConnectionTimeout(30);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
try {
String[] msgtopic = mqttCofigBean.getMsgTopic();
//订阅消息
int[] qos = new int[msgtopic.length];
for (int i = 0; i < msgtopic.length; i++) {
qos[i] = 0;
}
client.setCallback(new TopMsgCallback(client, options, msgtopic, qos));
client.connect(options);
client.subscribe(msgtopic, qos);
logger.info("MQTT CONNECT SUCCESS:{}:{}", mqttCofigBean.getClientId(), client);
} catch (Exception e) {
logger.error("MQTT CONNECT EXCEPTION:" + e);
}
}
/**
* 重连
* @throws Exception E
*/
public void reConnect() throws Exception {
if (null != client) {
this.connect();
}
}
/**
* 订阅某个主题
*/
public void subscribe(String topic, int qos) {
try {
logger.info("TOPIC:" + topic);
client.subscribe(topic, qos);
} catch (MqttException e) {
logger.error(e.getMessage());
}
}
public MqttClient getClient() {
return client;
}
public void setClient(MqttClient client) {
MqttConsumer.client = client;
}
public MqttTopic getMqttTopic() {
return mqttTopic;
}
public void setMqttTopic(MqttTopic mqttTopic) {
MqttConsumer.mqttTopic = mqttTopic;
}
- 创建信息回调类
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class TopMsgCallback implements MqttCallback {
private static Logger logger = LoggerFactory.getLogger(TopMsgCallback.class);
private MqttClient client;
private MqttConnectOptions options;
private String[] topic;
private int[] qos;
public TopMsgCallback() {
}
public TopMsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
this.client = client;
this.options = options;
this.topic = topic;
this.qos = qos;
}
/**
* 断开重连
*/
@Override
public void connectionLost(Throwable cause) {
logger.info("MQTT DISCONNECT, RECONNECTION");
while (true) {
try {
Thread.sleep(30000);
client.connect(options);
//订阅消息
client.subscribe(topic, qos);
logger.info("MQTT RECONNECTION SUCCESS:" + client);
break;
} catch (Exception e) {
logger.error("connectionLost :" + e);
}
}
}
/**
* 接收到消息调用令牌中调用
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
/**
* 消息处理
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
//订阅消息字符
String msg = new String(message.getPayload());
byte[] bymsg = getBytesFromObject(msg);
logger.info("topic:" + topic);
logger.info("msg:" + msg);
}
/**
* 对象转化为字节码
*/
public byte[] getBytesFromObject(Serializable obj) throws Exception {
if (obj == null) {
return null;
}
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
return bo.toByteArray();
}