一、背景
最近双12银联进行满减活动,由于外部接入商户响应速度较慢,导致了队列数据挤压,影响了原本没有参与满减活动的商户,为了解决此问题决定按照商户将队列进行拆分,降低彼此的影响。
在Spring Boot框架下大家基本上会想到如下这样修改方式,那随着被监听的队列越来越多,可以想象代码的可读性会比较差,所以基本这个目的实现了@JmsGroupListener注解来解决该问题(如果监监听的队列数量较少还是建议使用原生注解)。
@JmsListener(destination = "test_0001")
@JmsListener(destination = "test_0002")
public void receiveMessage(String msg) {
System.out.println("Received <" + msg + ">");
}
@JmsListeners(value = {@JmsListener(destination = "test_0001"), @JmsListener(destination = "test_0002")})
public void receiveMessage1(String msg) {
System.out.println("Received <" + msg + ">");
}
二、效果
在配置文件中配置需要监听的队列集合
activemq.message.group=0001|0002
在业务代码中使用@JmsGroupListener注解
@JmsGroupListener(group = "${activemq.message.group}", groupSplit = "|", destinationPrefix = "test_")
public void receiveMessage2(String msg) {
System.out.println("Received <" + msg + ">");
}
三、定义注解
定义一个注解,如下可以看出该注解与@JmsListener注解的区别,删除的注解属性的原因后面会进行介绍,按照第二部分的使用,最后监听的队列名为test_0001和test_0002。
public @interface JmsGroupListener {
//定义要监听到队列区分关键词集合
String group();
//关键词集合分隔符
String groupSplit();
//队列名称前缀
String destinationPrefix();
//String id() default "";
String containerFactory() default "";
//String destination();
String subscription() default "";
String selector() default "";
String concurrency() default "";
}
四、实现注解
①实现思路
Processing of @JmsListener annotations is performed by registering a JmsListenerAnnotationBeanPostProcessor. This can be done manually or, more conveniently, through the <jms:annotation-driven/> element or @EnableJms annotation.
通过查看@JmsListener注解的注释可以了解到注解的实现主要在JmsListenerAnnotationBeanPostProcessor中,该类继承了MergedBeanDefinitionPostProcessor,所以我们继承该类基于@JmsListener的实现方式实现@JmsGroupListener注解就可以了。
如果不知道为什么继承JmsListenerAnnotationBeanPostProcessor就可以实现的话可以看一下我同事写的主题为BeanFactoryPostProcessor,BeanPostProcessor,SmartInitializingSingleton等几个可拓展接口的执行时机的一篇博客,应该会很大的帮助。
②重写postProcessAfterInitialization方法
该方法大家注意两个Process代码块即可,第一个Process代码块主要构造一个监听方法与@MyJmsListener注解信息的Map。第二个Process代码块是处理每一个@MyJmsListener注解,也是实现了监听注册的关键代码。
@Override
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||
bean instanceof JmsListenerEndpointRegistry) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, JmsGroupListener.class)) {
//Process @MyJmsListener annotation ,Getting the relationship between method and annotation
Map<Method, JmsGroupListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<JmsGroupListener>) method -> {
JmsGroupListener listenerMethod = AnnotatedElementUtils.findMergedAnnotation(method, JmsGroupListener.class);
return listenerMethod;
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @JmsGroupListener annotations found on bean type: " + targetClass);
}
} else {
//Process each @MyJmsListener annotation
annotatedMethods.forEach((method, listener) -> processJmsListener(listener, method, bean));
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @JmsGroupListener methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
}
③重写processJmsListener方法
在本部分大家只要关注一个Process代码块即可,该部分实现了将group属性进行拆分,然后改造需要监听的MethodJmsListenerEndpoint并注册到JmsListenerEndpointRegistrar中。
在定义注解的部分我们注意到我们注释了@JmsListener注解的id属性,这是因为@
JmsGroupListener监听的是一个队列的集合,为了处理方便,我们自动为其生成id。
public void processJmsListener(JmsGroupListener jmsGroupListener, Method mostSpecificMethod, Object bean) {
Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
JmsListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(jmsGroupListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
} catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
" with id '" + containerFactoryBeanName + "' was found in the application context", ex);
}
}
//Process all destination names
String[] DestinationPostfixes = resolve(jmsGroupListener.group()).split("[" + jmsGroupListener.groupSplit() + "]");
for (String postfix : DestinationPostfixes) {
String destination = jmsGroupListener.destinationPrefix() + postfix;
MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
endpoint.setBean(bean);
endpoint.setMethod(invocableMethod);
endpoint.setMostSpecificMethod(mostSpecificMethod);
endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
endpoint.setBeanFactory(this.beanFactory);
//Avoid conflict
endpoint.setId(getEndpointId());
endpoint.setDestination(resolve(destination));
if (StringUtils.hasText(jmsGroupListener.selector())) {
endpoint.setSelector(resolve(jmsGroupListener.selector()));
}
if (StringUtils.hasText(jmsGroupListener.subscription())) {
endpoint.setSubscription(resolve(jmsGroupListener.subscription()));
}
if (StringUtils.hasText(jmsGroupListener.concurrency())) {
endpoint.setConcurrency(resolve(jmsGroupListener.concurrency()));
}
this.registrar.registerEndpoint(endpoint, factory);
}
}
④重写afterSingletonsInstantiated方法
通过查看JmsListenerAnnotationBeanPostProcessor的源码我们发现,在该类中afterSingletonsInstantiated方法的最关键的一句registrar.afterPropertiesSet()即可完成所有监听的注册。
我们原本的思路是依靠JmsListenerAnnotationBeanPostProcessor类的afterSingletonsInstantiated,但是后面通过调试发现我们自己构造的JmsListenerEndpointRegistrar对象中的JmsListenerEndpointRegistry对象需要传递给JmsListenerEndpointRegistrar类的registerAllEndpoints方法,所以迫于无奈我们只能重写afterSingletonsInstantiated方法。
所以在本部分的重点进行了setContainerFactoryBeanName和setEndpointRegistry(全局对象),本来进行该类重写时候本来想阉割对于JmsListenerConfigurer和MessageHandlerMethodFactory扩展,但是最后还是为了有一定的通用性保留了该部分。
@Override
public void afterSingletonsInstantiated() {
// Remove resolved singleton classes from cache
this.nonAnnotatedClasses.clear();
if (this.beanFactory instanceof ListableBeanFactory) {
// Apply JmsListenerConfigurer beans from the BeanFactory, if any
Map<String, JmsListenerConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);
List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (JmsListenerConfigurer configurer : configurers) {
configurer.configureJmsListeners(this.registrar);
}
}
// Must be set to obtain container factory by bean name
if (this.containerFactoryBeanName != null) {
registrar.setContainerFactoryBeanName(containerFactoryBeanName);
}
// Register endpointRegistry with spring context
if (this.registrar.getEndpointRegistry() == null) {
registrar.setEndpointRegistry(endpointRegistry);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
// Actually register all listeners
this.registrar.afterPropertiesSet();
}
五、总结
到此该注解的关键实现过程已经介绍完成,其中还有一部代码这里就不进行详细的介绍了,有需要的同学自己可以看一下实现源码(由于水平有限,欢迎大家来找茬),最后与大家分享一下对源码进行扩展的新的体会,调试源码->了解大体流程->缺什么补什么。