RabbitMQ笔记七:Spring AMQP API(生产和消费消息)

RabbitTemplate类是简化RabbitMQ访问的工具类(发送和接收消息)

总结:
1.使用RabbitTemplate进行消息的发送。
2.使用SimpleMessageListenerContainer类监听队列,进行消息的消费。

使用RabbitTemplate进行消息发送

加入spring-amqp依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.3.RELEASE</version>
        </dependency>
    </dependencies>

配置类:
ConnectionFactoryRabbitTemplate纳入到spring容器中

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
       //设置Exchange默认操作的exchange和routingkey
        rabbitTemplate.setExchange("zhihao.direct.exchange");
        rabbitTemplate.setRoutingKey("zhihao.debug");
        return rabbitTemplate;
    }
}

测试类:
RabbitTemplate#send方法发送消息,

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan
public class Application {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc","消息发送");
        messageProperties.getHeaders().put("type",10);

        Message message = new Message("hello".getBytes(),messageProperties);

        /**
         * 调用rabbitTemplate的send方法发送消息,如果没有指定exchange,Routing,则使用声明Exchange指定的
         * exchange,Routing
         * 如果RabbitTemplate没有设置,则默认的exchange 是DEFAULT_EXCHANGE为"",
         * 默认的routkey是DEFAULT_ROUTING_KEY为""
         */
        //1.
        //rabbitTemplate.send(message);

        //2.指定Routingkey,而exchange是Rabbitmq默认指定的
        //rabbitTemplate.send("zhihao.error",message);

        //3.即指定exchange,又指定了routing_key
        //rabbitTemplate.send("zhihao.login","ulogin",message);

        /**
         * 4.使用默认的defaultExchange进行投递消息,route key就是队列名,指定correlation_id属性,correlation_id属性是rabbitmq 进行异步rpc进行标识每次请求的唯一
         * id,下面会讲到
         */
        rabbitTemplate.send("","zhihao.order.queue",message,new CorrelationData("spring.amqp"));

        context.close();
    }
}

查看web管控台发现消息都发送成功了。

使用RabbitTemplate#convertAndSend方法发送消息,

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan
public class Application2 {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        /**
         * 使用convertAndSend方法,接收的参数是Object对象,其实是将接收的对象转换成Message对象,不指定exchange和routing key,那么就
         * 使用RabbitTemplate中设置的exchange和routing key
         */
        //rabbitTemplate.convertAndSend("this is my message");

        //指定exchange或者指定exchage和routing key
        //rabbitTemplate.convertAndSend("zhihao.error","this is my message order111");
        //rabbitTemplate.convertAndSend("","zhihao.user.queue","this is my message order222");

        //发送消息的后置处理器,MessagePostProcessor类的postProcessMessage方法得到的Message就是将参数Object内容转换成Message对象
        /*
        rabbitTemplate.convertAndSend("", "zhihao.user.queue", "this is my message processor", new MessagePostProcessor() {
            //在后置处理器上加上order和count属性
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.out.println("-------处理前message-------------");
                System.out.println(message);
                message.getMessageProperties().getHeaders().put("order",10);
                message.getMessageProperties().getHeaders().put("count",1);
                return message;
            }
        });
        */


        rabbitTemplate.convertAndSend("", "zhihao.user.queue", "message before", message1 -> {
            //使用lamdba的语法
            MessageProperties properties = new MessageProperties();
            properties.getHeaders().put("desc","消息发送");
            properties.getHeaders().put("type",10);

            Message messageafter = new Message("message after".getBytes(),properties);
            return messageafter;
        });

        context.close();
    }
}

消息的消费

使用容器的方式进行消费
认识一个接口org.springframework.amqp.rabbit.listener.MessageListenerContainer
其默认实现类org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer

MessageListenerContainer接口及其实现

MessageListenerContainer#setMessageListener方法,接收的参数类型
org.springframework.amqp.core.MessageListener或者org.springframework.amqp.rabbit.core.ChannelAwareMessageListener接口

代码:
ConnectionFactoryRabbitTemplateSimpleMessageListenerContainer实例纳入到spring容器中进行管理。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    /*
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //监听队列zhihao.user.queue,监听队列可以多个,参数类型是String[]
        container.setQueueNames("zhihao.user.queue");
        container.setMessageListener(new MessageListener() {
            //具体的消费逻辑
            @Override
            public void onMessage(Message message) {
                System.out.println("====接收到消息=====");
                System.out.println(message.getMessageProperties());
                System.out.println(new String(message.getBody()));
            }
        });
        return container;
    }
    */

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //队列可以是多个,参数是String的数组
        container.setQueueNames("zhihao.user.queue");
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            //得到了Channel参数,具体使用会在下面的博客详细讲解
            public void onMessage(Message message, Channel channel) throws Exception {
                System.out.println("====接收到消息=====");
                System.out.println(message.getMessageProperties());
                System.out.println(new String(message.getBody()));
            }
        });
        return container;
    }
}

关于setMessageListener接收的类型参数,

setMessageListener方法

当接收的参数不是MessageListener或者ChannelAwareMessageListener类型,则会抛出异常,具体的逻辑在checkMessageListener(messageListener);方法,

checkMessageListener方法

应用启动类,向zhihao.user.queue对象发送消息,并启动了spring容器,发现监听到队列并且消费了。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        rabbitTemplate.convertAndSend("","zhihao.user.queue","hello spring amqp");

        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

原理分析

稍微分析一下原理
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer接口,它继承AbstractMessageListenerContainer类,实现SmartLifecycle接口然后继承Lifecycle接口,意味着一旦SimpleMessageListenerContainer实例被spring容器管理,其生命周期就托管与spring容器来管理了,意味着当spring容器运行起来的时候,SimpleMessageListenerContainer容器启动,spring容器关闭的时候,SimpleMessageListenerContainer容器也关闭了。

设置在spring容器初始化的时候设置SimpleMessageListenerContainer不启动,(container.setAutoStartup(false);

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //队列可以是多个,参数是String的数组
        container.setQueueNames("zhihao.miao.order");
        //设置autoStartUp为false表示SimpleMessageListenerContainer没有启动
        container.setAutoStartup(false);
        container.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println("====接收到消息=====");
                System.out.println(message.getMessageProperties());
                System.out.println(new String(message.getBody()));
            }
        });
        return container;
    }
}

此时不能消费消息,也可以在应用启动类启动SimpleMessageListenerContainer容器,在应用启动类中启动

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        //在spring容器中启动SimpleMessageListenerContainer
        context.getBean(SimpleMessageListenerContainer.class).start();
        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

以上说明只有org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer启动了,才会消费消息。

总结
SimpleMessageListenerContainer可以托管到spring容器中,由spring容器进行SimpleMessageListenerContainer的生命周期管理,默认情况下spring容器启动的时候,启动SimpleMessageListenerContainer,spring容器关闭,会stop掉SimpleMessageListenerContainer,也可以设置SimpleMessageListenerContainer手动启动(context.getBean(SimpleMessageListenerContainer.class).start();)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,423评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,147评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,019评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,443评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,535评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,798评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,941评论 3 407
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,704评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,152评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,494评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,629评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,295评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,901评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,978评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,333评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,499评论 2 348

推荐阅读更多精彩内容