三个注解:
@EnableRabbit
@RabbitListener
@RabbitHandler
@EnableRabbit
@EnableRabbit和@Configuration一起使用,可以加在类或者方法上,这个注解开启了容器对注册的bean的@RabbitListener检查。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}
从源代码上可以看出这个注解提供了很简单的功能就是引入了另一个配置类:RabbitBootstrapConfiguration。该类注册了两个bean:一个BeanPostProcessor一个ListenerEndpointRegistry。
注册的BeanPostProcessor则会在bean初始化之后扫描@RabbitListener和@RabbitHandler注解。
@Configuration
public class RabbitBootstrapConfiguration {
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}
@RabbitListener
@RabbitListener用于注册Listener时使用的信息:如queue,exchange,key、ListenerContainerFactory和RabbitAdmin的bean name。
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue}", durable = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "${mq.config.key}"), admin = "rabbitAdmin")
扫描到bean带有该注解后,首先会将注解的内容封装到Endpoint对象中并和ListenerContainerFactory的实例一起添加到上面的RabbitListenerEndpointRegistry实例中。添加的时候会创建相应的ListenerContainer实例并添加Listener对象。
RabbitListenerAnnotationBeanPostProcessor通过RabbitListenerEndpointRegistrar间接持有RabbitListenerEndpointRegistry实例。
@RabbitHandler
@RabbitListener 和 @RabbitHandler结合使用,不同类型的消息使用不同的方法来处理。
public class CommandListener{
@RabbitHandler
public void handler1(ApiMessage msg){
System.out.println(msg);
}
@RabbitHandler
public void handler2(Map msg){
System.out.println(msg);
}
}
可能遇到的问题
AMQP错误:ACCESS_REFUSED
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - operation not permitted on the default exchange, class-id=50, method-id=20)
出现这个问题的原因是因为ListenContainer的auto-declare默认为true,container会使用RabbitAdmin去重新声明Queue、Exchange、Binding等对象。如果没有配置RabbitAdmin则会报ACCESS_REFUSED的错误,看官方解释:
Starting with version 1.4, SimpleMessageListenerContainer has this new property.
When set to true (default), the container will use a RabbitAdmin to redeclare all AMQP objects (Queues, Exchanges, Bindings), if it detects that at least one of its queues is missing during startup, perhaps because it’s an auto-delete or an expired queue, but the redeclaration will proceed if the queue is missing for any reason. To disable this behavior, set this property to false. Note that the container will fail to start if all of its queues are missing.
在spring-boot中,使用@EnableRabbit,需要在context添加ListenerContainerFactory。但是ListenerContainerFactory没有提供设置container的autoDeclare属性的接口,也没办法为ListenerContainer实例注入RabbitAdmin实例。
解决方案是为@RabbitListener指定RabbitAdmin实例的bean name
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue}", durable = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "${mq.config.key}"), admin = "rabbitAdmin")