RabbitMQ整合Spring AMQP(二)

消息模板组件 - RabbitTemplate

  • 我们在与SpringAMQP整合的时候进行发送消息的关键类
  • 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口 ReturnCallback等等。
  • 同样我们需要进行注入到Spring容器中,然后直接使用
  • 在与Spring整合时需要实例化,但是在与SrpingBoot整合时,在配置文件里添加配置即可

RabbitTemplate的使用

声明RabbitTemplate,在RabbitMQConfig类中声明

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
使用RabbitTemplate发送消息

MessagePostProcessor类通常用来设置消息的Header以及消息的属性,它允许在消息被转换器处理后对其进行进一步的修改,可以通过它对发送的消息进行统一的设置。

    @Test
    public void testSendMessage() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        //设置消息属性
        messageProperties.getHeaders().put("desc", "信息描述..");
        messageProperties.getHeaders().put("type", "自定义消息类型..");
        Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
        
        //发送消息
        rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.err.println("------添加额外的设置---------");
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    }

运行测试类,进入管控台的Queues菜单,点击queue001,然后下面有个Get messages,点击按钮就可以获取消息了。

使用RabbitTemplate发送消息2

可以调用send方法发送消息,也可以直接发送字符串消息。

    @Test
    public void testSendMessage2() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("mq发送消息".getBytes(), messageProperties);
        
        //第三个参数必须是Message类型
        rabbitTemplate.send("topic001", "spring.abc", message);
        
        //直接发送字符串消息,会自动转换
        rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
        rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
    }

运行测试类,依然是进入管控台的Queues菜单,点击相应的队列,点击Get messages获取消息。

消息容器 - SimpleMessageListenerContainer

  • 简单消息监听容器,这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
  • 监听队列(多个队列)、自动启动、自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
  • 设置消费者数量、最小最大数量、批量消费
  • 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等等
特性说明
  • SimpleMessageListenerContainer可以进行动态设置, 比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等
  • 很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特去实现的。所以可以看出SpringAMQP非常的强大
SimpleMessageListenerContainer使用

这里简单列举一些,还可以设置很多很多属性,包括后面要介绍的消息适配器和消息转换器。

    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //监控队列,可以同时监控多个
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //设置当前消费者数量
        container.setConcurrentConsumers(1);
        //设置消费者并发数量的上限,不能小于ConcurrentConsumers,默认为ConcurrentConsumers,消费者会按需添加
        //指的是消费者的最大并行度是多少
        container.setMaxConcurrentConsumers(5);
        //设置是否重回队列
        container.setDefaultRequeueRejected(false);
        //设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //设置消费端标签策略:就是在消费端生成自己的标签时可以指定一个生成策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
        
        //设置消息监听器
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.err.println("----------消费者: " + msg);
            }
        });
        return container;
    }

运行Application类,选择Channel菜单,点击表格中的channel,可以看到队列对应的消费者情况,Consumer tag就是根据我们指定的规则生成的。

关闭Application,运行测试类进行消息发送

    @Test
    public void testSendMessage2() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("mq发送消息".getBytes(), messageProperties);
        
        //第三个参数必须是Message类型
        rabbitTemplate.send("topic001", "spring.abc", message);
        
        //直接发送字符串消息,会自动转换
        rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
        rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
    }

监听器监听到了消息并打印了如下内容

----------消费者: mq发送消息
----------消费者: hello object message send!
----------消费者: hello object message send!
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容