业务需求
ActiveMQ 服务重启后,我们的项目仍然可以实现对队列的监听。
目前:我们每次重启ActiveMQ,需要重启项目。
尝试
修改yml:
broker-url: failover:(tcp://127.0.01:61616) # 故障转移机制
...
可以实现项目启动后的自动重连,但是项目初次启动时必须要求ActiveMQ的服务能连接上。
找问题发现是 ActiveMQ 监听的问题,导致项目初次启动失败
初步考虑了几种实现方法:
- 另起线程监听
- 监听类的Lazy
- 定时任务修改broker-url
1.在监听类 添加Lazy
不管ActiveMQ服务是否启动,项目初次启动都成功,但是重启 ActiveMQ 监听不到数据。
不修改 broker-url: failover,将Listener 定义为 Lazy ,并且在Listener中写一个空方法,在一个前端请求的接口中注入Listener 并且该调用空方法进行创建消费者。
测试问题:能够实现功能需求,但是刷新页面会创建多个消费者,后来测试又不能创建消费者了(这个问题不知道为啥)
最终实现方式
不采用注解监听的方式
public void startListen() throws JMSException {
Connection connection = JmsUtils.connection;
if (connection == null){
connection = connectionFactory.createConnection();
connection.start();
JmsUtils.connection = connection;
}
Session session = JmsUtils.session;
if (session == null){
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
JmsUtils.session = session;
}
MessageConsumer consumer = JmsUtils.consumer;
if (consumer == null){
consumer = session.createConsumer(session.createQueue(ywmptMq));
JmsUtils.consumer = consumer;
}
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
readActiveYwmptQueue(((TextMessage)message).getText());
} catch (JMSException e) {
System.err.println("报错了:" + e.getMessage());
}
}
});
}
JmsUtils.java
@Data
public class JmsUtils {
public static Connection connection;
public static MessageConsumer consumer;
public static Session session;
}
定时任务重连
@Scheduled(fixedDelay = 5000L)
public void startListener() {
try {
activeMQQueueListener.startListen()
} catch (Exception e) {
// e.printStackTrace()
log.info("JMS 连接异常")
//把static 变量置空
JmsUtils.connection?.close()
JmsUtils.connection = null
JmsUtils.session?.close()
JmsUtils.session = null
JmsUtils.consumer?.close()
JmsUtils.consumer = null
}
}
关闭项目还存在消费者,解决方式:closeListener
void closeConsumer(){
try {
Connection connection = JmsUtils.connection;
if (connection != null){
connection.close()
log.info("关闭connection成功!")
}
Session session = JmsUtils.session
if (session != null){
session.close()
log.info("关闭session成功!")
}
MessageConsumer consumer = JmsUtils.consumer
if (consumer != null){
consumer.close()
log.info("关闭consumer成功!")
}
} catch (Exception e) {
log.error("关闭JMS消费者失败:",e)
}
}