RabbitListenerConfigurer详解
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
public RabbitListenerConfigurer rabbitListenerConfigurer(){
return new RabbitListenerConfigurer() {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
//endpoint设置zhihao.miao.order队列的消息处理逻辑
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId("10");
endpoint.setQueueNames("zhihao.miao.order");
endpoint.setMessageListener(message -> {
System.out.println("endpoint1处理消息的逻辑");
System.out.println(new String(message.getBody()));
});
//使用适配器来处理消息,设置了order,pay队列的消息处理逻辑
SimpleRabbitListenerEndpoint endpoint2 = new SimpleRabbitListenerEndpoint();
endpoint2.setId("11");
endpoint2.setQueueNames("order","pay");
System.out.println("endpoint2处理消息的逻辑");
endpoint2.setMessageListener(new MessageListenerAdapter(new MessageHandler()));
//注册二个endpoint
registrar.registerEndpoint(endpoint);
registrar.registerEndpoint(endpoint2);
}
};
}
}
消费端消息处理器
public class MessageHandler {
public void handleMessage(byte[] message){
System.out.println("消费消息");
System.out.println(new String(message));
}
}
消费端应用启动类
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@EnableRabbit
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("rabbit service startup");
TimeUnit.SECONDS.sleep(60);
context.close();
}
}
使用总结
- 实现
RabbitListenerConfigurer
接口,并把实现类托管到spring容器中 - 在spring容器中,托管一个
RabbitListenerContainerFactory
的bean(SimpleRabbitListenerContainerFactory
) - 在启动类上加上
@EnableRabbit
注解