Background
springboot 集成mqtt实现发布、订阅消息的功能
- application.yml配置
# Mqtt配置
mqtt:
serverURIs: tcp://localhost:1883
username: admin
password: public
client:
id: ${random.value}
topic: topic_default
- MqttConfig
package com.cloudansys.config;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@Slf4j
@Configuration
@IntegrationComponentScan
@Getter
@Setter
public class MqttConfig {
public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";
public static final String INPUT_CHANNEL = "mqttInputChannel";
public static final String SUB_TOPICS = "PSimulation,Pressure,PSimulationPump,PSimulationPressure," +
"PSimulationValve,PSimulationFlow,FSimulation,FSimulationPump,FSimulationPressure," +
"FSimulationValve,FSimulationFlow,leak,blast";
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.serverURIs}")
private String hostUrl;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
@PostConstruct
public void init() {
log.debug("username:{} password:{} hostUrl:{} clientId :{} ",
this.username, this.password, this.hostUrl, this.clientId, this.defaultTopic);
}
@Bean
public MqttPahoClientFactory clientFactory() {
final MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{hostUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(options);
return factory;
}
@Bean(value = OUTBOUND_CHANNEL)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
public MessageHandler mqttOutbound() {
final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
handler.setDefaultQos(1);
handler.setDefaultRetained(false);
handler.setDefaultTopic(defaultTopic);
handler.setAsync(false);
handler.setAsyncEvents(false);
return handler;
}
/**
* MQTT消息接收处理
*/
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId + "_inbound", clientFactory(), SUB_TOPICS.split(","));
adapter.setCompletionTimeout(3000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public MessageHandler handler() {
return message -> {
String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
log.info("topic: {}", topic);
String[] topics = SUB_TOPICS.split(",");
for (String t : topics) {
if (t.equals(topic)) {
log.info("payload: {}", message.getPayload().toString());
}
}
};
}
}
- MqttGateway
package com.cloudansys.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
public interface MqttGateway {
void sendToMqtt(String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}