第四章-高级整合应用- AMQP

1.RabbitMQ整合Spring AMQP

  • RabbitAdmin
  • SpringAMQP声明
  • RabbitTemplate
  • SimpleMessageListenerContainer 简单消息监听容器
  • MessageListenerAdapter 消息监听适配器
  • MessageConverter 转换器(进行消息序列化,反序列化)
image.png

注意

  • autoStartup必须设置为true ,否则Spring容器不会加载RabbitAdmin类
  • RabbitAdmin底层实现就是从Spring容器中获取Exchange,Bingding,RoutingKey以及Queue的@Bean声明
  • 然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列的RabbitMQ基础功能操作。
  • 例如: 添加一个交换机,删除一个绑定,清除一个队列中的消息等。
image.png

image.png

消息模板 RabbitTemplate

RabbitTemplate 即消息模板

  • 我们在与springAMQP整合的时候进行发送消息的关键类。

  • 该类提供了丰富的发送消息方法,包括可靠性投递方法,回调监听消息接口ConfirmCallback,返回值确认接口ReturnCallback等等。同样我们需要进行注入到spring容器中。然后进行使用。

  • 在与spring整合时需要实例化。但是在与springBoot整合时。在配置文件中添加配置即可

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
    

简单消息监听容器:SimpleMessageListenerContainer

  • 这个类非常强大。 我们可以对他进行很多设置。对于消费者的配置项。这个类都可以满足
  • 监听队列(多个队列) 、自动启动、自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务容量(并发)、 是否开启事务、回滚消息等
  • 设置消费者数量、最大最小数量。批量消费
  • 设置消息的签收模式、是否重回队列,异常捕获handler函数。
  • 设置消费者标签生成策略,是否独占模式,消费者属性等
  • 设置具体的鉴定器,消息转换器等等。

注意:

  • SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者的大小,接收消息的模式等
  • 很多基于RabbitMQ的定制化后端管控台在进行动态设置的时候,也是根据这一特性实现的。所以可以看出SpringAMQP非常的强大

SimpleMessageListenerContainer为什么可以动态感知配置变更?

配置代码


    /**
     * 简单消息监听容器
     * 配置完成后。可以在管控台看到消息者信息。 以及消费者标签信息
     *
     * @param connectionFactory 链接工厂
     * @return SimpleMessageListenerContainer
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        //设置要监听的队列
        simpleMessageListenerContainer.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //初始化消费者数量
        simpleMessageListenerContainer.setConcurrentConsumers(1);
        //最大消费者数量
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
        //设置是否重回队列[一般为false]
        simpleMessageListenerContainer.setDefaultRequeueRejected(false);
        //设置自动ack
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //设置channel 是否外露
         simpleMessageListenerContainer.setExposeListenerChannel(true);
        //设置消费端标签的策略
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queueName) {
                return queueName + "_" + UUID.randomUUID().toString();
            }
        });
        //设置消息监听 ChannelAwareMessageListener
        simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.out.println("----------消费者: " + msg);
            }
        });

        return simpleMessageListenerContainer;


    }

连接信息

image.png

消费者信息和相关配置

image.png

消息监听适配器:MessageListenerAdapter


/**
 * 通过`simpleMessageListenerContainer` 配置消息监听适配器。 指向这个类
 *
 * @author yangHX
 * createTime  2019/4/6 12:16
 */
public class MessageDelegate {


    /**
     * MessageListenerAdapter 默认指定接收消息的方法的名字就是 handleMessage .当然也可以手动设置
     *
     * @param messageBody message信息
     */
    public void handleMessage(byte[] messageBody) {
        System.err.println("默认方法,消息内容: " + new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息内容:" + messageBody);
    }

}





/**
 * spring amqp 消息转换器
 *
 * @author yangHX
 * createTime  2019/4/6 12:28
 */
public class TextMessageConverter implements MessageConverter {


    /**
     * 将数据转化为 message 类
     *
     * @param o                 要发送的数据
     * @param messageProperties 消息头
     * @return Message
     * @throws MessageConversionException ex
     */
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(o.toString().getBytes(), messageProperties);
    }

    /**
     * 将message转换为想要的数据类型
     *
     * @param message message
     * @return Object
     * @throws MessageConversionException ex
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {

        String contentType = message.getMessageProperties().getContentType();
        if (null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}






        ///消息监听适配器 只截取了一小段
        /*
         * 适配器方式。 默认是有自己的方法名字。 handleMessage
         *  可以自己指定一个方法的名称。 consumerMessage
         *  也可以添加一个转换器: 从字节数组转换为String
         */
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
        messageListenerAdapter.setMessageConverter(new TextMessageConverter());
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);


        return simpleMessageListenerContainer;





 /**
     * 发送消息。测试转换器和适配器
     * <p>
     * 转换器判断contentType 将字节数组转化为字符串
     * 适配器将数据交给 MessageDelegate 的 consumeMessage 方法进行处理
     */
    @Test
    public void testMessage4Text() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plan");
        Message message = new Message("mq 消息1234".getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.abc", message);
        rabbitTemplate.send("topic002", "rabbit.abc", message);
    }



image.png

MessageListenerAdapter 消息监听适配器总结

  • 通过messageListenerAdapter的代码我们可以看出如下核心属性
  • defaultListenerMethod :默认监听方法名称,用于设置监听方法名称
  • Delegate 委托对象:实际真实的委托对象,用于处理消息
  • queueOrTagToMethodName 队列名称与方法名称组成的集合
  • 可以一一进行队列和方法名称的匹配
  • 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理

MessageConverter 消息转换器

  • 我们在消息传输的时候,正常情况下消息体为二进制的数据方式进行传输。如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter

  • 自定义常用转换器 MessageConverter 一般来讲都需要实现这个接口

  • 重写下面两个方法

    • toMessage : java 对象转换为Message
    • fromMessage : Message对象转换为java对象

转换器类型

  • json转换器: jackson2JsonMessageConverter: 可以进行java对象的转换功能
  • DefaultJackson2JavaTypeMapper映射器: 可以进行java对象的映射关系
  • 自定义二进制转换器: 比如图片类型、PDF,PPT, 流媒体

SpringBoot 整合RabbitMQ

  • publisher-confirms, 实现了一个监听器,用于监听Borker端给我们返回的确认消息: RabbitTemplate.ConfirmCallback

    /**
     * 回调函数 confirm确认模式
     */
    final ConfirmCallback confirmCallback = new ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {

            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if (!ack) {
                System.out.println("-----异常处理");
            }
        }
    };
  • publisher-returns, 保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续处理,保证消息的路由成功 RabbitTemplate.ReturnCallback

    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.err.println("return exchange : " + exchange + " , routingKey : " + routingKey + " , replyCode: " + replyCode + " , replyText: " + replyText);
        }
    };

  • 注意一点 在发送消息的时候,对template进行配置mandatory=true 保证监听有效

    • mandatory
      当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
  • 生产端还可以配置其他属性,比如发送重试,超时时间,次数,间隔等。




SpringBoot 整合RabbitMQ 消费端

  • 首先配置手动确认模式,用于ack的手工处理,这样我们可以保证消息的可靠性投递,或者在消费端消费失败的时候可以做到重回队列,根据业务记录日志等处理

  • 可以设置消费端的监听个数和最大个数,用于控制消费端并发情况
    -spring.rabbitmq.listener.simple.concurrency=5
    -spring.rabbitmq.listener.simple.max-concurrency=10
    -spring.rabbitmq.listener.simple.acknowledge-mode=manual 手工签收

@RabbitListener注解使用

  • 消费端监听 @RabbitMQLIstener 注解。 这个注解在实际工作中非常好用
    • @RabbitListener 是一个组合注解 , 里面可以进行注解配置
    • @QueueBinding ,@Queue, @Exchange 直接通过这个组合注解一次性搞定消费端交换机,队列,绑定,路由,并且配置监听功能等。
image.png

SpringCloud Stream 整合

  • Spring Clould. 这个全家桶框架在整个中小型互联网公司异常的火爆,那么相对应着,Spring Cloud Stream 就渐渐的被大家所重视起来。

  • 生产者和消费者可以是不同的消息中间件

image.png
image.png
image.png
  • Barista接口: Barista接口是用来定义后面类的参数。这一接口定义通道类型和通道名称,通道名称是作为配置用。通道类型则决定了app会使用这个通道进行发送消息,还是从中接收消息

  • @Output: 输出注解 。用于定义发送消息接口

  • @Input: 输入注解, 用于定义消息的消费信息接口

  • @StreamListener: 用于定义监听方法的注解

  • 使用spring Cloud Stream 非常简单。 只需要使用好这三个注解即可。在实现高性能消息的生产和消费的场景非常适合,但是使用 SpringCloudStream框架有一个非常大的问题,就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题

-这个原因是因为SpringCloudStream框架为了和kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的。 这点就是当前版本的Spring Cloud Stream 的定位

image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容