SpringBoot集成RabbitMQ-动态注入Bean方式

实现Direct,Fanout,Topic和死信转发方式实现的延迟队列

一个让处女座程序员很难受的问题:
每次申明一个队列,都需要用@Bean注解在config类里面显式的往容器里面注入一个Queue Bean和Binding Bean,十几个队列下来,那场面简直不能忍.
怎么解决呢,思路:
通过遍历枚举的方式,统一往spring容器中注入bean.废话不多说,上代码

一 使用场景说明
1.Direct
根据routekey精确匹配消费,只消费一次
2.Fanout
广播消息队列,同交换机内的所有消费者,都接收到消息
3.Topic
支持模糊匹配,可匹配到多个.配合AnonymousQueue队列可实现集群内多点同一业务集群消费.如:修改集群内所有应用内存中配置
4.TTL
延迟队列,实现消息延迟指定时间消费

二 关键代码

  1. 配置类:
import com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author onlinever
 * @date 2018/09/06
 */
@Service
public class RabbitQueueBeanRegister implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware {

    private ApplicationContext applicationContext;

    private BeanDefinitionRegistry beanDefinitionRegistry;

    private String adapterSuffix = "Adapter";

    private Map<RabbitQueueEnum, Queue> topicQueues = Maps.newHashMap();

    private List<TopicConsumer> topicConsumers;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
        this.beanDefinitionRegistry = beanDefinitionRegistry;
        //声明交换机
        declareExchange();
        //声明队列和绑定
        declareQueueAndBinding();
        //奇怪的执行顺序
        if (haveTopicQueue()) {
            declareTopicMessageListenerAdapter();
            declareTopicMessageListenerContainer();
        }
    }

    private boolean haveTopicQueue() {
        try {
            topicConsumers = new ArrayList<>(applicationContext.getBeansOfType(TopicConsumer.class).values());
            return !topicConsumers.isEmpty();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            return false;
        }
    }

    /**
     * 声明交换机
     */
    private void declareExchange() {
        for (RabbitExchangeEnum rabbitExchangeEnum : RabbitExchangeEnum.values()) {
            switch (rabbitExchangeEnum.getRabbitExchangeTypeEnum()) {
                case FANOUT_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () -> (FanoutExchange) ExchangeBuilder
                            .fanoutExchange(rabbitExchangeEnum.getExchangeName())
                            .durable(true)
                            .build()).getBeanDefinition());
                    break;
                case TOPIC_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(TopicExchange.class, () -> (TopicExchange) ExchangeBuilder
                            .topicExchange(rabbitExchangeEnum.getExchangeName())
                            .durable(true)
                            .build()).getBeanDefinition());
                    break;
                default:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(DirectExchange.class, () -> (DirectExchange) ExchangeBuilder
                            .directExchange(rabbitExchangeEnum.getExchangeName())
                            .durable(true)
                            .build()).getBeanDefinition());
                    break;
            }
        }
    }

    /**
     * 声明队列和绑定
     */
    private void declareQueueAndBinding() {
        String bindingSuffix = "Binding";
        for (RabbitQueueEnum rabbitQueueEnum : RabbitQueueEnum.values()) {
            //注册所有队列
            beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(Queue.class, () -> {
                Queue queue;
                switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {
                    case TTL_QUEUE:
                        queue = QueueBuilder
                                .durable(rabbitQueueEnum.getRouteKey())
                                // 配置到期后转发的交换
                                .withArgument("x-dead-letter-exchange", rabbitQueueEnum.getRabbitQueueEnum().getExchangeName())
                                // 配置到期后转发的路由键
                                .withArgument("x-dead-letter-routing-key", rabbitQueueEnum.getRabbitQueueEnum().getRouteKey())
                                .build();
                        break;
                    case TOPIC_QUEUE:
                        queue = new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy(StringUtils.getTopicQueueNamePrefix(rabbitQueueEnum.getRouteKey())));
                        topicQueues.put(rabbitQueueEnum, queue);
                        break;
                    default:
                        queue = new Queue(rabbitQueueEnum.getRouteKey());
                        break;
                }
                return queue;
            }).getBeanDefinition());
            //注册队列与交换机的绑定
            switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {
                case FANOUT_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                            .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                            .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), FanoutExchange.class))).getBeanDefinition());

                    break;
                case NORMAL_QUEUE:
                case TTL_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                            .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                            .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), DirectExchange.class))
                            .with(rabbitQueueEnum.getRouteKey())).getBeanDefinition());
                    break;
                case TOPIC_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                            .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                            .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), TopicExchange.class))
                            .with(StringUtils.getTopicQueueRoute(rabbitQueueEnum.getRouteKey()))).getBeanDefinition());
                    break;
                default:
                    break;
            }
        }
    }

    /**
     * 声明Topic消息监听适配器
     */
    private void declareTopicMessageListenerAdapter() {
        topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix,
                BeanDefinitionBuilder.genericBeanDefinition(MessageListenerAdapter.class, () -> new MessageListenerAdapter(topicConsumer)).getBeanDefinition()));
    }

    /**
     * 声明Topic消息监听容器
     */
    private void declareTopicMessageListenerContainer() {
        String containerSuffix = "Container";
        topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + containerSuffix,
                BeanDefinitionBuilder.genericBeanDefinition(SimpleMessageListenerContainer.class, () -> {
                    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
                    container.setQueues(topicQueues.get(topicConsumer.getQueueEnum()));
                    container.setConnectionFactory(applicationContext.getBean("rabbitConnectionFactory", ConnectionFactory.class));
                    container.setMessageListener(applicationContext.getBean(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix));
                    container.setRabbitAdmin(applicationContext.getBean(RabbitAdmin.class));
                    return container;
                }).getBeanDefinition()));
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {

    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
  1. 枚举类
    2.1 交换机类型枚举
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;

/**
 * @author onlinever
 * @date 2018/09/06
 */
public enum RabbitExchangeTypeEnum {

    /**
     * 死信转发方式延迟队列
     */
    TTL_QUEUE(1, DirectExchange.class),
    /**
     * 正常队列
     */
    NORMAL_QUEUE(2, DirectExchange.class),
    /**
     * 广播队列
     */
    FANOUT_QUEUE(3, FanoutExchange.class),
    /**
     * topic队列
     */
    TOPIC_QUEUE(4, TopicExchange.class);


    /**
     * 队列routeKey
     */
    private int index;

    /**
     * 交换机class
     */
    private Class exchangeClazz;


    RabbitExchangeTypeEnum(int index, Class exchangeClazz) {
        this.index = index;
        this.exchangeClazz = exchangeClazz;
    }

    public int getIndex() {
        return index;
    }

    public Class getExchangeClazz() {
        return exchangeClazz;
    }
}

2.2 交换机枚举

/**
 * @author onlinever
 * @date 2018/09/06
 */
public enum RabbitExchangeEnum {


    /**
     * rabbit交换机名称
     * 默认一个应用设置一个交换机
     * exchange.{0}.{1}
     * 0: 交换机类型 direct、topic、fanout、headers
     * 1: 应用名称
     */
    DIRECT_EXCHANGE("directExchange", "exchange.direct.gateway", RabbitExchangeTypeEnum.NORMAL_QUEUE),
    FANOUT_EXCHANGE("fanoutExchange", "exchange.fanout.gateway", RabbitExchangeTypeEnum.FANOUT_QUEUE),
    TOPIC_EXCHANGE("topicExchange", "exchange.topic.gateway", RabbitExchangeTypeEnum.TOPIC_QUEUE),;

    /**
     * 交换机beanName
     */
    private String beanName;
    /**
     * 交换机key
     */
    private String exchangeName;
    /**
     * 交换机类型
     */
    private RabbitExchangeTypeEnum rabbitExchangeTypeEnum;

    RabbitExchangeEnum(String beanName, String exchangeName, RabbitExchangeTypeEnum rabbitExchangeTypeEnum) {
        this.beanName = beanName;
        this.exchangeName = exchangeName;
        this.rabbitExchangeTypeEnum = rabbitExchangeTypeEnum;
    }

    public String getExchangeName() {
        return exchangeName;
    }

    public String getBeanName() {
        return beanName;
    }

    public RabbitExchangeTypeEnum getRabbitExchangeTypeEnum() {
        return rabbitExchangeTypeEnum;
    }
}

2.3 队列枚举

/**
 * @author onlinever
 * @date 2018/09/06
 */
public enum RabbitQueueEnum {

    ;
    /**
     * 队列BeanName
     */
    private String beanName;
    /**
     * 队列routeKey
     */
    private String routeKey;
    /**
     * 交换机
     */
    private RabbitExchangeEnum exchangeEnum;

    /**
     * 死信转发到队列
     */
    private RabbitQueueEnum rabbitQueueEnum;


    RabbitQueueEnum(String beanName, String routeKey, RabbitExchangeEnum exchangeEnum, RabbitQueueEnum rabbitQueueEnum) {
        this.beanName = beanName;
        this.routeKey = routeKey;
        this.exchangeEnum = exchangeEnum;
        this.rabbitQueueEnum = rabbitQueueEnum;
    }

    public String getRouteKey() {
        return routeKey;
    }

    public RabbitExchangeEnum getExchangeEnum() {
        return exchangeEnum;
    }

    public String getExchangeName() {
        return exchangeEnum.getExchangeName();
    }

    public String getBeanName() {
        return beanName;
    }

    public RabbitQueueEnum getRabbitQueueEnum() {
        return rabbitQueueEnum;
    }
}
  1. Topic消费者接口
/**
 * topic队列消费者
 *
 * @author onlinever
 * @date 2018/8/17
 */
public interface TopicConsumer {
    /**
     * 消费的队列
     *
     * @return 队列
     */
    RabbitQueueEnum getQueueEnum();

    /**
     * 具体消费者的实现
     *
     * @param message 消息
     */
    void handleMessage(String message);
}
  1. 其他消费者使用@RabbitListener方式
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355