Spring Boot中@JmsListener如何实现ActiveMQ多队列监听--自定义@JmsGroupListener注解

一、背景

最近双12银联进行满减活动,由于外部接入商户响应速度较慢,导致了队列数据挤压,影响了原本没有参与满减活动的商户,为了解决此问题决定按照商户将队列进行拆分,降低彼此的影响。

在Spring Boot框架下大家基本上会想到如下这样修改方式,那随着被监听的队列越来越多,可以想象代码的可读性会比较差,所以基本这个目的实现了@JmsGroupListener注解来解决该问题(如果监监听的队列数量较少还是建议使用原生注解)。

    @JmsListener(destination = "test_0001")
    @JmsListener(destination = "test_0002")
    public void receiveMessage(String msg) {
        System.out.println("Received <" + msg + ">");
    }

    @JmsListeners(value = {@JmsListener(destination = "test_0001"), @JmsListener(destination = "test_0002")})
    public void receiveMessage1(String msg) {
        System.out.println("Received <" + msg + ">");
    }

二、效果

在配置文件中配置需要监听的队列集合

activemq.message.group=0001|0002

在业务代码中使用@JmsGroupListener注解

@JmsGroupListener(group = "${activemq.message.group}", groupSplit = "|", destinationPrefix = "test_")
    public void receiveMessage2(String msg) {
        System.out.println("Received <" + msg + ">");
    }

三、定义注解

定义一个注解,如下可以看出该注解与@JmsListener注解的区别,删除的注解属性的原因后面会进行介绍,按照第二部分的使用,最后监听的队列名为test_0001和test_0002。

public @interface JmsGroupListener {
    //定义要监听到队列区分关键词集合
    String group();
    //关键词集合分隔符
    String groupSplit();
    //队列名称前缀
    String destinationPrefix();
    //String id() default "";
    String containerFactory() default "";
    //String destination();
    String subscription() default "";
    String selector() default "";
    String concurrency() default "";
}

四、实现注解

①实现思路
Processing of @JmsListener annotations is performed by registering a JmsListenerAnnotationBeanPostProcessor. This can be done manually or, more conveniently, through the <jms:annotation-driven/> element or @EnableJms annotation.

通过查看@JmsListener注解的注释可以了解到注解的实现主要在JmsListenerAnnotationBeanPostProcessor中,该类继承了MergedBeanDefinitionPostProcessor,所以我们继承该类基于@JmsListener的实现方式实现@JmsGroupListener注解就可以了。

如果不知道为什么继承JmsListenerAnnotationBeanPostProcessor就可以实现的话可以看一下我同事写的主题为BeanFactoryPostProcessor,BeanPostProcessor,SmartInitializingSingleton等几个可拓展接口的执行时机的一篇博客,应该会很大的帮助。

②重写postProcessAfterInitialization方法

该方法大家注意两个Process代码块即可,第一个Process代码块主要构造一个监听方法与@MyJmsListener注解信息的Map。第二个Process代码块是处理每一个@MyJmsListener注解,也是实现了监听注册的关键代码。

 @Override
       @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||
                bean instanceof JmsListenerEndpointRegistry) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }

        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass) &&
                AnnotationUtils.isCandidateClass(targetClass, JmsGroupListener.class)) {

            //Process @MyJmsListener annotation ,Getting the relationship between method and annotation
            Map<Method, JmsGroupListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<JmsGroupListener>) method -> {
                        JmsGroupListener listenerMethod = AnnotatedElementUtils.findMergedAnnotation(method, JmsGroupListener.class);
                        return listenerMethod;
                    });

            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @JmsGroupListener annotations found on bean type: " + targetClass);
                }
            } else {

                //Process each @MyJmsListener annotation
                annotatedMethods.forEach((method, listener) -> processJmsListener(listener, method, bean));

                if (logger.isDebugEnabled()) {
                    logger.debug(annotatedMethods.size() + " @JmsGroupListener methods processed on bean '" + beanName +
                            "': " + annotatedMethods);
                }
            }
        }
        return bean;
    }
    }
③重写processJmsListener方法

在本部分大家只要关注一个Process代码块即可,该部分实现了将group属性进行拆分,然后改造需要监听的MethodJmsListenerEndpoint并注册到JmsListenerEndpointRegistrar中。

在定义注解的部分我们注意到我们注释了@JmsListener注解的id属性,这是因为@
JmsGroupListener监听的是一个队列的集合,为了处理方便,我们自动为其生成id。

 public void processJmsListener(JmsGroupListener jmsGroupListener, Method mostSpecificMethod, Object bean) {
        Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());

        JmsListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(jmsGroupListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
            } catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
                        mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
                        " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        //Process all destination names
        String[] DestinationPostfixes = resolve(jmsGroupListener.group()).split("[" + jmsGroupListener.groupSplit() + "]");
        for (String postfix : DestinationPostfixes) {
            String destination = jmsGroupListener.destinationPrefix() + postfix;
            MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
            endpoint.setBean(bean);
            endpoint.setMethod(invocableMethod);
            endpoint.setMostSpecificMethod(mostSpecificMethod);
            endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
            endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
            endpoint.setBeanFactory(this.beanFactory);
            //Avoid conflict
            endpoint.setId(getEndpointId());
            endpoint.setDestination(resolve(destination));
            if (StringUtils.hasText(jmsGroupListener.selector())) {
                endpoint.setSelector(resolve(jmsGroupListener.selector()));
            }
            if (StringUtils.hasText(jmsGroupListener.subscription())) {
                endpoint.setSubscription(resolve(jmsGroupListener.subscription()));
            }
            if (StringUtils.hasText(jmsGroupListener.concurrency())) {
                endpoint.setConcurrency(resolve(jmsGroupListener.concurrency()));
            }
            this.registrar.registerEndpoint(endpoint, factory);
        }

    }
④重写afterSingletonsInstantiated方法

通过查看JmsListenerAnnotationBeanPostProcessor的源码我们发现,在该类中afterSingletonsInstantiated方法的最关键的一句registrar.afterPropertiesSet()即可完成所有监听的注册。

我们原本的思路是依靠JmsListenerAnnotationBeanPostProcessor类的afterSingletonsInstantiated,但是后面通过调试发现我们自己构造的JmsListenerEndpointRegistrar对象中的JmsListenerEndpointRegistry对象需要传递给JmsListenerEndpointRegistrar类的registerAllEndpoints方法,所以迫于无奈我们只能重写afterSingletonsInstantiated方法。

所以在本部分的重点进行了setContainerFactoryBeanName和setEndpointRegistry(全局对象),本来进行该类重写时候本来想阉割对于JmsListenerConfigurer和MessageHandlerMethodFactory扩展,但是最后还是为了有一定的通用性保留了该部分。

@Override
    public void afterSingletonsInstantiated() {
        // Remove resolved singleton classes from cache
        this.nonAnnotatedClasses.clear();

        if (this.beanFactory instanceof ListableBeanFactory) {
            // Apply JmsListenerConfigurer beans from the BeanFactory, if any
            Map<String, JmsListenerConfigurer> beans =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);
            List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());
            AnnotationAwareOrderComparator.sort(configurers);
            for (JmsListenerConfigurer configurer : configurers) {
                configurer.configureJmsListeners(this.registrar);
            }
        }
        // Must be set to obtain container factory by bean name
        if (this.containerFactoryBeanName != null) {
            registrar.setContainerFactoryBeanName(containerFactoryBeanName);
        }
        // Register endpointRegistry with spring context
        if (this.registrar.getEndpointRegistry() == null) {
            registrar.setEndpointRegistry(endpointRegistry);
        }

        // Set the custom handler method factory once resolved by the configurer
        MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
        if (handlerMethodFactory != null) {
            this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
        }

        // Actually register all listeners
        this.registrar.afterPropertiesSet();
    }

五、总结

到此该注解的关键实现过程已经介绍完成,其中还有一部代码这里就不进行详细的介绍了,有需要的同学自己可以看一下实现源码(由于水平有限,欢迎大家来找茬),最后与大家分享一下对源码进行扩展的新的体会,调试源码->了解大体流程->缺什么补什么。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355