在Java Spring Boot中,对于RabbitMQ消费者/订阅者,大概有以下实现方式:
- 采用SimpleMessageListenerContainer
- 采用RabbitListener
第一种:采用SimpleMessageListenerContainer
@Configuration
public class RabbitMQConfig {
@Bean
public CachingConnectionFactory connectionFactory1() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("host1");
connectionFactory.setPort(port1);
connectionFactory.setUsername("user1");
connectionFactory.setPassword("password1");
return connectionFactory;
}
@Bean
public SimpleMessageListenerContainer consumeMessages(ConnectionFactory connectionFactory1) {
SimpleMessageListenerContainer container1 = new SimpleMessageListenerContainer(connectionFactory1);
container1.setQueueNames("queue_name_1");
container1.setMessageListener((Message message) -> {
// 处理来自第一个队列的消息
System.out.println("Received message from first queue: " + new String(message.getBody()));
});
// container1.start(); //这里不用启动,Spring会自动启动
return container1;
}
}
实现原理
- 首先要定义连接,我们假设为connectionFactory1,当然也可以定义多个连接,并且设置Bean,方便注入使用
- 构造Container,这里有个参数ConnectionFactory,对于Bean,Spring会自动调用,默认是上面的连接,如果有多个,就要给参数增加指定了:@Qualifier("connectionFactory1")
对于SimpleMessageListenerContainer,可以参考:https://cloud.tencent.com/developer/article/2244239
第二种:RabbitListener
这个就非常简单了
@Component
public class RabbitMQListenerDemo {
@RabbitListener(queues = "queue_name_1")
public void onMessage1(String message) {
String messageFormat = String.format("RabbitMQListenerDemo:Listener1:%s", message);
System.out.println(messageFormat);
}
@RabbitListener(queues = "queue_name_1")
public void onMessage2(Message message) {
String mqMsg = new String(message.getBody(), StandardCharsets.UTF_8);
String messageId = "";
if (message.getMessageProperties() != null && !StringUtils.isBlank(message.getMessageProperties().getMessageId())) {
messageId = message.getMessageProperties().getMessageId();
}
String messageFormat = String.format("RabbitMQListenerDemo:Listener2:%s:%s", messageId, mqMsg);
System.out.println(messageFormat);
}
}
第一种接收字符串类型,第二种接收Message类型。
其他问题:
比如现在都是多容器部署,一个代码可能包含Api、Job等,如果都采用自动部署,可能每个容器都会启动,特别采用了配置中心,每个容器都一样的,虽然多个消费者都可以工作,提高了效率,但难以管理,如果处理呢
- 采用环境变量
- 采用其他方式触发,比如放到Job里面,可以采用Job来触发
如果采用第二种方式,是否可以配合RabbitListener使用呢,答案是可以的
@RabbitListener(queues = "queue_name_1", autoStartup = "false", id = "RabbitMQListenerDemo1", containerFactory = "rabbitListenerContainerFactory")
public void onMessage1(String message) {
String messageFormat = String.format("RabbitMQListenerDemo:Listener1:%s", message);
System.out.println(messageFormat);
}
我们发现RabbitListener有个autoStartup 属性,这个可以设置为"false",这样应用在启动的时候,就不会自动启动了,那该如何启动呢
private RabbitListenerEndpointRegistry registry;
MessageListenerContainer container = registry.getListenerContainer("RabbitMQListenerDemo1");
if (!container.isRunning()) {
container.start();
}
这里可以通过RabbitListenerEndpointRegistry的getListenerContainer(id)方法,获取Listener,然后调用start()方法启动Listener。