01_Spring集成RabbitMQ之声明式注解

@Author Jacky Wang
日常积累,转载请注明出处,//www.greatytc.com/p/b081f1fd1480
最近新开发的一个项目是由传统的SSM架构进行开发的,之前介绍了SpringBoot集成RabbitMQ的方式,这次特地对Spring集成RabbitMQ做一次记录及介绍。
如需较详细了解RabbitMQ的相关知识,可参考我的另一篇文章:03_SpringBoot集成RabbitMQ

由Spring集成RabbitMQ一般采用Xml配置或注解式两种方式来进行集成。由于个人不喜欢过多的xml文件,因此这里仅对注解方式进行记录。

1. 声明式注解集成RabbitMQ

1.1 步骤

  1. 引入pom依赖
  2. 创建配置文件,包含mq的基础配置
  3. 创建RabbitMQ监听器
  4. 声明RabbitMQ配置类
  5. 创建消息生产者
  6. 测试

1.2 集成

1.2.1 引入Pom依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.1.5.RELEASE</version>
</dependency>
<!-- 解决冲突 -->
<dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-mapper-asl</artifactId>
    <version>1.9.13</version>
</dependency>
1.2.2 创建配置文件 rabbitmq.properties
#rabbitmq.host=127.0.0.1
#rabbitmq服务器
rabbitmq.host=192.168.3.171
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual.host=/

#交换机
exchange.mes.com.add=mes.fanout.com.add
exchange.mes.com.del=mes.fanout.com.delete
exchange.mes.com.update=mes.fanout.com.update
exchange.mes.user.add=mes.fanout.user.add
exchange.mes.user.del=mes.fanout.user.delete
exchange.mes.user.update=mes.fanout.user.update

#队列
queue.mes.com.add=mes_company_add
queue.mes.com.del=mes_company_delete
queue.mes.com.update=mes_company_update
queue.mes.user.add=mes_user_add
queue.mes.user.del=mes_user_delete
queue.mes.user.update=mes_user_update
1.2.3 创建RabbitMQ监听器
/**
 * 消费监听类
 */
@Component
@Transactional
public class QueueListener {

    public static final Logger logger = LoggerFactory.getLogger(QueueListener.class);

    @Autowired
    private AppInterfaceService appInterfaceService;
    @Autowired
    private RabbitMQConfig rabbitMQConfig;
    @Autowired
    private OfficeMapper officeMapper;
    @Autowired
    private SystemService systemService;
    @Autowired
    private UserMapper userMapper;

    /**
    * 如果监听队列指定的方法不存在则执行默认方法
    */
    public void onMessage(byte[] msg) {
        try {
            logger.info("onMessage : [{}]", new String(msg, "UTF-8"));
        } catch (Exception e) {
            logger.error("Error : [{}]", e);
        }
    }

    /**
     * 公司信息同步
     *
     * @param message
     */
    public void addCompany(byte[] message) {
        logger.info("RabbitMQ Method addCompany Get Msg : [{}]", new String(message));
    }

    public void updateCompany(byte[] message) {
        logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
    }

    /**
     * 用户信息同步
     *
     * @param message
     */
    public void addUser(byte[] message) {
        logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
    }

    public void delUser(byte[] message) {
        logger.info("RabbitMQ Method delUser Get Msg : [{}]", new String(message));
    }

    public void updateUser(byte[] message) {
        logger.info("RabbitMQ Method updateUser Get Msg : [{}]", new String(message));
    }
}
1.2.4 创建RabbitMQ声明配置类
  • 此次使用消费者容器进行消息消费,可支持单类多方法消费不同队列
  • 若无须使用消费者容器,可取消下面 声明消费者监听执行类的注释即可
  • 示例总共创建了六个交换机与六个队列,具体根据实际情况创建即可
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

import java.util.HashMap;
import java.util.Map;

@Configuration
@PropertySource(value = "classpath:/properties/rabbitmq.properties")
public class RabbitMQConfig {
    @Autowired
    private QueueListener queueListener;

    @Value("${rabbitmq.host}")
    private String host;
    @Value("${rabbitmq.port}")
    private int port;
    @Value("${rabbitmq.username}")
    private String username;
    @Value("${rabbitmq.password}")
    private String password;
    @Value("${rabbitmq.virtual.host}")
    private String vhost;

    @Value("${exchange.mes.com.add}")
    private String companyAddExchangeName;
    @Value("${exchange.mes.com.del}")
    private String companyDelExchangeName;
    @Value("${exchange.mes.com.update}")
    private String companyUpdateExchangeName;
    @Value("${exchange.mes.user.add}")
    private String userAddExchangeName;
    @Value("${exchange.mes.user.del}")
    private String userDelExchangeName;
    @Value("${exchange.mes.user.update}")
    private String userUpdateExchangeName;

    @Value("${queue.mes.com.add}")
    private String companyAddQueueName;
    @Value("${queue.mes.com.del}")
    private String companyDelQueueName;
    @Value("${queue.mes.com.update}")
    private String companyUpdateQueueName;
    @Value("${queue.mes.user.add}")
    private String userAddQueueName;
    @Value("${queue.mes.user.del}")
    private String userDelQueueName;
    @Value("${queue.mes.user.update}")
    private String userUpdateQueueName;

    /**
     * rabbitmq连接配置
     *
     * @return
     */
    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin admin = new RabbitAdmin(rabbitConnectionFactory());
        admin.setIgnoreDeclarationExceptions(true); //即使有关rabbitmq的bean初始化失败整个web应用还能正常启动
        return admin;
    }

    /**
     * 声明交换机Exchange
     *
     * @return
     */
    @Bean
    public FanoutExchange companyAddExchange() {
        return new FanoutExchange(companyAddExchangeName, true, false);
    }

    @Bean
    public FanoutExchange companyDelExchange() {
        return new FanoutExchange(companyDelExchangeName, true, false);
    }

    @Bean
    public FanoutExchange companyUpdateExchange() {
        return new FanoutExchange(companyUpdateExchangeName, true, false);
    }

    @Bean
    public FanoutExchange userAddExchange() {
        return new FanoutExchange(userAddExchangeName, true, false);
    }

    @Bean
    public FanoutExchange userDelExchange() {
        return new FanoutExchange(userDelExchangeName, true, false);
    }

    @Bean
    public FanoutExchange userUpdateExchange() {
        return new FanoutExchange(userUpdateExchangeName, true, false);
    }

    /**
     * 声明队列Queue
     *
     * @return
     */
    @Bean
    public Queue companyAddQueue() {
        return new Queue(companyAddQueueName, true, false, false);
    }

    @Bean
    public Queue companyDelQueue() {
        return new Queue(companyDelQueueName, true, false, false);
    }

    @Bean
    public Queue companyUpdateQueue() {
        return new Queue(companyUpdateQueueName, true, false, false);
    }

    @Bean
    public Queue userAddQueue() {
        return new Queue(userAddQueueName, true, false, false);
    }

    @Bean
    public Queue userDelQueue() {
        return new Queue(userDelQueueName, true, false, false);
    }

    @Bean
    public Queue userUpdateQueue() {
        return new Queue(userUpdateQueueName, true, false, false);
    }

    /**
     * 将队列绑定到指定的交换机
     *
     * @param companyAddQueue
     * @param companyAddExchange
     * @return
     */
    @Bean
    public Binding companyAddBinding(Queue companyAddQueue, FanoutExchange companyAddExchange) {
        return BindingBuilder.bind(companyAddQueue).to(companyAddExchange);
    }

    @Bean
    public Binding companyDelBinding(Queue companyDelQueue, FanoutExchange companyDelExchange) {
        return BindingBuilder.bind(companyDelQueue).to(companyDelExchange);
    }

    @Bean
    public Binding companyUpdateBinding(Queue companyUpdateQueue, FanoutExchange companyUpdateExchange) {
        return BindingBuilder.bind(companyUpdateQueue).to(companyUpdateExchange);
    }

    @Bean
    public Binding userAddBinding(Queue userAddQueue, FanoutExchange userAddExchange) {
        return BindingBuilder.bind(userAddQueue).to(userAddExchange);
    }

    @Bean
    public Binding userDelBinding(Queue userDelQueue, FanoutExchange userDelExchange) {
        return BindingBuilder.bind(userDelQueue).to(userDelExchange);
    }

    @Bean
    public Binding userUpdateBinding(Queue userUpdateQueue, FanoutExchange userUpdateExchange) {
        return BindingBuilder.bind(userUpdateQueue).to(userUpdateExchange);
    }

    /**
     * 声明消费者监听执行类
     * @param receiver
     * @return
     */
    /*@Bean
    public MessageListenerAdapter listenerAdapter(QueueListener receiver) {
        MessageListenerAdapter m = new MessageListenerAdapter(receiver, "process");
        m.setMessageConverter(jsonMessageConverter());
        return m;
    }*/

    /**
     * 消费者容器
     * 为不同队列指定不同的执行方法
     * @param rabbitConnectionFactory
     * @return
     */
    @Bean
    SimpleMessageListenerContainer listenerContainer(ConnectionFactory rabbitConnectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory);
        //container.setMessageConverter(jsonMessageConverter());
        //container.setConcurrentConsumers(1);
        //container.setMaxConcurrentConsumers(5);
        //container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(q -> projectKey + "_" + q);
        //container.setQueues(companyAddQueue(), companyDelQueue(), companyUpdateQueue(), userAddQueue(), userDelQueue(), userUpdateQueue());
        container.setQueueNames(companyAddQueueName, companyDelQueueName, companyUpdateQueueName, userAddQueueName, userDelQueueName, userUpdateQueueName);

        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(queueListener);
        listenerAdapter.setDefaultListenerMethod("onMessage");
        listenerAdapter.setMessageConverter(jsonMessageConverter());
        Map<String, String> queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put(companyAddQueueName, "addCompany");
        queueOrTagToMethodName.put(companyDelQueueName, "delCompany");
        queueOrTagToMethodName.put(companyUpdateQueueName, "updateCompany");
        queueOrTagToMethodName.put(userAddQueueName, "addUser");
        queueOrTagToMethodName.put(userDelQueueName, "delUser");
        queueOrTagToMethodName.put(userUpdateQueueName, "updateUser");
        listenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        container.setMessageListener(listenerAdapter);
        return container;
    }
}
1.2.5 创建消息生产者
@Service
public class RabbitMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final static Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);

    public void sendDataToQueue(String exchange, String routingKey, Object object) {
        try {
            rabbitTemplate.setMessagePropertiesConverter(new MessagePropertiesConverter() {
                @Override
                public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) {
                    MessageProperties messageProperties = new MessageProperties();
                    messageProperties.setContentType("application/json");
                    messageProperties.setContentEncoding("UTF-8");
                    return messageProperties;
                }

                @Override
                public AMQP.BasicProperties fromMessageProperties(MessageProperties source, String charset) {
                    return null;
                }
            });

            rabbitTemplate.convertAndSend(exchange, routingKey, object);
        } catch (Exception e) {
            logger.error("发送mq消息异常,Cause:[]", e);
        }

    }
}
1.2.6 测试
自定义Test或自定义Controller测试调用生产者发送消息,查看消费者是否消费即可。

eg:

@Controller
@RequestMapping("${adminPath}/rabbitmq")
public class RabbitMQController {

    @Autowired
    private RabbitMQSender rabbitMQSender;

    @RequestMapping("/sendToCompanyAdd")
    @ResponseBody
    public String sendToCompanyAdd(String id) {
        HashMap<String, String> map = new HashMap<>();
        map.put("id", id);
        rabbitMQSender.sendDataToQueue("mes.fanout.com.add", null, map);
        return "SUCCESS";
    }
}

3 注意事项

  • 若配置失败,请检查Spring的注解扫描是否能扫描到配置类。
  • 检查RabbitMQ基础配置信息是否有错误

2. Xml方式集成配置文件参考

spring-rabbitmq.xml:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <description>rabbitmq 连接服务配置</description>

    <!-- 加载配置属性文件 -->
    <context:property-placeholder ignore-unresolvable="true" location="classpath:/properties/rabbitmq.properties"/>

    <!-- 连接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}"
                               password="${rabbitmq.password}" port="${rabbitmq.port}"
                               virtual-host="${rabbitmq.vhost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring template声明-->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                     message-converter="jsonMessageConverter"/>

    <!-- 消息对象json转换类 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

    <!-- 申明一个消息队列Queue -->
    <rabbit:queue id="testQueueId" name="${rabbitmq.queue}" durable="false" auto-delete="false" exclusive="false"/>

    <!-- 定义交换机 -->
    <rabbit:direct-exchange id="testExchangeId" name="${rabbitmq.exchange}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="testQueueId" key="${rabbitmq.routingKey}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- MQ监听配置 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="testQueueId" ref="queueListener" method="process"/>
    </rabbit:listener-container>
</beans>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,393评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,790评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,391评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,703评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,613评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,003评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,507评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,158评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,300评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,256评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,274评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,984评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,569评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,662评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,899评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,268评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,840评论 2 339

推荐阅读更多精彩内容