RabbitMQ学习笔记 - RabbitMQ消息监听容器

RabbitMQ消息监听容器

    你好!欢迎来到Java成长笔记,主要是用于相互交流,相互学习,也希望分享能帮到大家,如有错误之处,希望指正,谢谢!

    通过配置消息监听容器,可以对队列或者多个队列进行监听,开发者可以进行相应的业务处理,实现接收和发送数据的自定义处理。

SimpleMessageListenerContainer

1、SimpleMessageListenerContainer进行很多设置,消费者的配置项进行设置,可以满足监听队列(多个队列)、自动启动、自动声明功能。
2、简单消息监听容器
    1)设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等。
    2)设置消费者数量、最小最大数量、批量消费。
3、设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数。
4、设置消费者标签生成策略、是否独占模式、消费者属性等。
5、设置具体的监听器、消息转換器等等。
6、SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。

基础设置加载,代码展示如下:

/**
* 基础设置加载
*/
public void init (final SimpleMessageListenerContainer container) {
       // 同时监听多个队列
       container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
       // 设置当前的消费者数量和最大消费者数量
       container.setConcurrentConsumers(1);
       container.setMaxConcurrentConsumers(5);
       // 设置是否重回队列
       container.setDefaultRequeueRejected(false);
       // 设置是否自动签收
       container.setAcknowledgeMode(AcknowledgeMode.AUTO);
       // 设置是否监听
       container.setExposeListenerChannel(true);
       // 设置消费端标签策略
       container.setConsumerTagStrategy(new ConsumerTagStrategy() {
           @Override
           public String createConsumerTag(String s) {
               return s + "_" + UUID.randomUUID().toString();
           }
       });
}

消息监听端配置,代码展示如下:

@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
      
       final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

       this.init(container);
        
       // 设置消息监听
       container.setMessageListener(new ChannelAwareMessageListener() {
           @Override
           public void onMessage(Message message, Channel channel) throws Exception {
               final String msg = new String(message.getBody());
               log.error(" 消费者 --->>> " + msg);
           }
       });
       return container;
}

MessageListenerAdapter

适配器方法(一)

指定消息转换器,将字节数组转换为String

代码展示如下:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.util.Optional;

@Slf4j
public class TextMessageConverter implements MessageConverter {

    /**
     * @Description: Object转为Message
     */
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    /**
     * @Description: Message转为Java对象
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        final String contentType = message.getMessageProperties().getContentType();
        Optional.ofNullable(contentType)
                .filter(StringUtils::isNotBlank)
                .filter(x->contentType.contains("test"))
                .map(z->{
                    return new String(message.getBody());
                });
        return message.getBody();
    }
}

// 监听类设置配置
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
     final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

     this.init(container);
       
     final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
     // 方法委托 自定义方法
     adapter.setDefaultListenerMethod("consumeMessage");
     adapter.setMessageConverter(new TextMessageConverter());
     container.setMessageListener(adapter);
     return container;         
}

public void consumeMessage(byte[] messageBody) {
    log.error("字节数组方法,消息内容:{}", new String(messageBody));
}

// 测试类
@Test
public void testSendMessage4Text() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain");
    Message message = new Message("TextMessageConverter配置mq消息Message".getBytes(), messageProperties);
    // 发送消息
    rabbitTemplate.send("topic001", "spring.abc", message);
    rabbitTemplate.send("topic002", "rabbit.abc", message);
}

如果不配置setDefaultListenerMethod方法,下图为监听类默认方法

MessageListenerAdapter默认方法handleMessage

适配器方法(二)

将队列名称和消息转换方法进行一对一匹配

代码展示如下:

@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
   
   final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

   this.init(container);
       
   final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
   adapter.setMessageConverter(new TextMessageConverter());
   Map<String, String> queueOrTagToMethodName = Maps.newHashMap();
   queueOrTagToMethodName.put("queue001", "method1");
   queueOrTagToMethodName.put("queue002", "method2");
   adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
   container.setMessageListener(adapter);
   return container;
}

// 监听方法
public void method1(String messageBody) {
        log.error("method1收到消息内容:{} ", messageBody);
}

public void method2(String messageBody) {
    log.error("method2收到消息内容:{} ", messageBody);
}

// 测试类
@Test
public void testSendMessage4Text() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain");
    Message message = new Message("TextMessageConverter配置mq消息Message".getBytes(), messageProperties);
    // 发送消息
    rabbitTemplate.send("topic001", "spring.abc", message);
    rabbitTemplate.send("topic002", "rabbit.abc", message);
}

MessageConverter转换器

JSON格式转换器

设置json格式转换器

代码展示如下:

// 支持json格式转换器
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {

     final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

     this.init(container);
     
     final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
     adapter.setDefaultListenerMethod("consumeMessage");
     Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
     adapter.setMessageConverter(jackson2JsonMessageConverter);
     container.setMessageListener(adapter);
     return container;
}

// 测试类
@Test
public void testSendJsonMessage() throws Exception {
    final Order order =
            new Order(UUID.randomUUID().toString().replaceAll("-",""),
                    "订单详情", "订单描述信息");
    final String json = new ObjectMapper().writeValueAsString(order);
    log.error("testSendJsonMessage json: " + json);
    final MessageProperties messageProperties = new MessageProperties();
    // 需要修改contentType为 application/json
    messageProperties.setContentType("application/json");
    final Message message = new Message(json.getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.order", message);
}

// 监听类
public void consumeMessage(Map messageBody) {
    log.error("consumeMessage方法,消息内容:{} ", messageBody);
}

Java对象转换

设置java对象转换

代码展示如下:

@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {

    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

    this.init(container);
     
    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    container.setMessageListener(adapter);
    return container;
}

// 监听类
public void consumeMessage(Order order) {
        log.error("consumeMessage order对象,消息内容,id:{} ", order.getId(),",name:{} ", order.getName(), ", content:{} ", order.getContent());
}

// 测试类
@Test
public void testSendJavaMessage() throws Exception {
    final Order order =
            new Order(UUID.randomUUID().toString().replaceAll("-",""),
                    "订单详情", "订单描述信息");
    final String json = new ObjectMapper().writeValueAsString(order);
    log.error("testSendJavaMessage json: " + json);
    final MessageProperties messageProperties = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties.setContentType("application/json");
    messageProperties.getHeaders().put("__TypeId__", "com.show.model.Order");
    final Message message = new Message(json.getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.order", message);
}

支持java对象多映射转换

设置java对象多映射转换

代码展示如下:

@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {

    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

    this.init(container);
     
    final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    final DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    final Map<String, Class<?>> idClassMapping = Maps.newHashMap();
    idClassMapping.put("order", com.show.model.Order.class);
    idClassMapping.put("packaged", com.show.model.Packaged.class);
    javaTypeMapper.setIdClassMapping(idClassMapping);
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    container.setMessageListener(adapter);
    return container;
}

// 监听类 
public void consumeMessage(Order order) {
     log.error("consumeMessage order对象,id{},name{},content{}", order.getId(), order.getName(), order.getContent());
}

public void consumeMessage(Packaged pack) {
     log.error("consumeMessage package对象,id{},name{},content{}", pack.getId(), pack.getName(), pack.getDescription());
}

// 测试类
@Test
public void testSendMappingMessage() throws Exception {
    final ObjectMapper mapper = new ObjectMapper();
    // Order类处理
    final Order order =
            new Order(UUID.randomUUID().toString().replaceAll("-",""),
                    "订单详情", "订单描述信息");
    final String json1 = mapper.writeValueAsString(order);
    log.error("testSendMappingMessage json1: " + json1);
    final MessageProperties messageProperties1 = new MessageProperties();
    messageProperties1.setContentType("application/json");
    messageProperties1.getHeaders().put("__TypeId__", "order");
    final Message message1 = new Message(json1.getBytes(), messageProperties1);
    rabbitTemplate.send("topic001", "spring.order", message1);
    // Packaged类处理
    final Packaged pack =
            new Packaged(UUID.randomUUID().toString().replaceAll("-",""), "包裹详情", "包裹描述信息");
    final String json2 = mapper.writeValueAsString(pack);
    log.error("testSendMappingMessage json2: " + json2);
    final MessageProperties messageProperties2 = new MessageProperties();
    messageProperties2.setContentType("application/json");
    messageProperties2.getHeaders().put("__TypeId__", "packaged");
    final Message message2 = new Message(json2.getBytes(), messageProperties2);
    rabbitTemplate.send("topic001", "spring.pack", message2);
}

Java多对象映射转换

设置java对象多映射转换

代码展示如下:

@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {

    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

    this.init(container);
     
    final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    final DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    final Map<String, Class<?>> idClassMapping = Maps.newHashMap();
    idClassMapping.put("order", com.show.model.Order.class);
    idClassMapping.put("packaged", com.show.model.Packaged.class);
    javaTypeMapper.setIdClassMapping(idClassMapping);
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    container.setMessageListener(adapter);
    return container;
}

// 监听类 
public void consumeMessage(Order order) {
     log.error("consumeMessage order对象,id{},name{},content{}", order.getId(), order.getName(), order.getContent());
}

public void consumeMessage(Packaged pack) {
     log.error("consumeMessage package对象,id{},name{},content{}", pack.getId(), pack.getName(), pack.getDescription());
}

// 测试类
@Test
public void testSendMappingMessage() throws Exception {
    final ObjectMapper mapper = new ObjectMapper();

    final Order order =
            new Order(UUID.randomUUID().toString().replaceAll("-",""),
                    "订单详情", "订单描述信息");
    final String json1 = mapper.writeValueAsString(order);
    log.error("testSendMappingMessage json1: " + json1);
    final MessageProperties messageProperties1 = new MessageProperties();
    messageProperties1.setContentType("application/json");
    messageProperties1.getHeaders().put("__TypeId__", "order");
    final Message message1 = new Message(json1.getBytes(), messageProperties1);
    rabbitTemplate.send("topic001", "spring.order", message1);

    final Packaged pack =
            new Packaged(UUID.randomUUID().toString().replaceAll("-",""),
                    "包裹详情", "包裹描述信息");
    final String json2 = mapper.writeValueAsString(pack);
    log.error("testSendMappingMessage json2: " + json2);
    final MessageProperties messageProperties2 = new MessageProperties();
    messageProperties2.setContentType("application/json");
    messageProperties2.getHeaders().put("__TypeId__", "packaged");
    final Message message2 = new Message(json2.getBytes(), messageProperties2);
    rabbitTemplate.send("topic001", "spring.pack", message2);
}

多对象转换包含image和pdf转换

设置多对象转换包含image和pdf转换

代码展示如下:

@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {

    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

    this.init(container);
     
    final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    // 全局的转换器
    ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
    final TextMessageConverter textConvert = new TextMessageConverter();
    convert.addDelegate("text", textConvert);
    convert.addDelegate("html/text", textConvert);
    convert.addDelegate("xml/text", textConvert);
    convert.addDelegate("text/plain", textConvert);
    final Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
    convert.addDelegate("json", jsonConvert);
    convert.addDelegate("application/json", jsonConvert);
    final ImageMessageConverter imageConverter = new ImageMessageConverter();
    convert.addDelegate("image/jpg", imageConverter);
    convert.addDelegate("image", imageConverter);
    final PDFMessageConverter pdfConverter = new PDFMessageConverter();
    convert.addDelegate("application/pdf", pdfConverter);
    adapter.setMessageConverter(convert);
    container.setMessageListener(adapter);
    return container;
}

// PDF监听类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;

@Slf4j
public class PDFMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        log.error("PDF MessageConverter:{}", System.currentTimeMillis());
        final byte[] body = message.getBody();
        final String fileName = UUID.randomUUID().toString().replaceAll("-","");
        final String path = "H:/rebbitmq/" + fileName + ".pdf";
        final File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }
}

// Image监听类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;

@Slf4j
public class ImageMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        log.error("Image MessageConverter:{}", System.currentTimeMillis());
        final Object _extName = message.getMessageProperties().getHeaders().get("extName");
        final String extName = _extName == null ? "jpg" : _extName.toString();
        final byte[] body = message.getBody();
        final String fileName = UUID.randomUUID().toString().replaceAll("-","");
        final String path = "H:/rebbitmq/" + fileName + "." + extName;
        final File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
        }
}

// Image测试类
@Test
public void testSendImageExtConverterMessage() throws Exception {
    byte[] body = Files.readAllBytes(Paths.get("H:/rebbitmq/", "aw.jpg"));
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("image/jpg");
    messageProperties.getHeaders().put("extName", "jpg");
    Message message = new Message(body, messageProperties);
    rabbitTemplate.send("", "image_queue", message);
}

// PDF测试类
@Test
public void testSendPdfExtConverterMessage() throws Exception {
    byte[] body = Files.readAllBytes(Paths.get("H:/rebbitmq/", "dell.pdf"));
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("application/pdf");
    Message message = new Message(body, messageProperties);
    rabbitTemplate.send("", "pdf_queue", message);
}

本章完结,后续还会持续更新,分享Java成长笔记,希望我们能一起成长。如果你觉得我的分享有用,记得点赞和关注哦!这对我是最好的鼓励。谢谢!

PS:转载请注明出处!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容