RabbitMQ整合Spring家族

RabbitMQ整合Spring AMQP

AMQP 核心组件

  • RabbitAdmin
  • SpringAMQP声明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdapter
  • MessageConverter

RabbitAdmin

RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可。

@Configuration
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

注意:

  • autoStartUp必须要设置为true,否则Spring容器不会加载RabbitAdmin类
  • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
  • 使用RabbitTemplate的execute方法执行对应的什么、修改、删除等一系列RabbitMQ基础功能操作
  • 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等

RabbitAdmin的使用:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void testAdmin(){
        rabbitAdmin.declareExchange(new DirectExchange("test.direct",false,false));
        rabbitAdmin.declareExchange(new TopicExchange("test.topic",false,false));
        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout",false,false));

        rabbitAdmin.declareQueue(new Queue("test.direct.queue",false));
        rabbitAdmin.declareQueue(new Queue("test.topic.queue",false));
        rabbitAdmin.declareQueue(new Queue("test.fanout.queue",false));

        rabbitAdmin.declareBinding(new Binding("test.direct.queue",Binding.DestinationType.QUEUE,
                "test.direct","driect",new HashMap<>()));

        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test.topic.queue",false))    //直接创建队列
                .to(new TopicExchange("test.topic",false,false)) //直接创建交换机,建立关联关系
                .with("key.#"));    //指定路由key

        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test.fanout.queue",false))    //直接创建队列
                        .to(new FanoutExchange("test.fanout",false,false))); //直接创建交换机,建立关联关系

        //清空队列
        rabbitAdmin.purgeQueue("test.topic.queue",false);
    }
}
RabbitAdmin类的实现接口图

SpringAMQP声明

  • 在Rabbit基础API里面声明一个Exchange、声明一个绑定、一个队列
  • 使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式
@Configuration
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    /**
     * 针对消费者配置
     * 1、 设置交换机类型
     * 2、 将队列绑定到交换机
     * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     * HeadersExchange:通过添加属性key-value匹配
     * DirectExchange:按照routingkey分发到指定队列
     * TopicExchange:多关键字匹配
     *
     */
    @Bean
    public TopicExchange exchange001() {
        return new TopicExchange("topic001", true, false);
    }

    @Bean
    public Queue queue001() {
        return new Queue("queue001", true); //队列持久
    }

    @Bean
    public Binding binding001() {
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
    }

    @Bean
    public TopicExchange exchange002() {
        return new TopicExchange("topic002", true, false);
    }

    @Bean
    public Queue queue002() {
        return new Queue("queue002", true); //队列持久
    }

    @Bean
    public Binding binding002() {
        return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
    }

    @Bean
    public Queue queue003() {
        return new Queue("queue003", true); //队列持久
    }

    @Bean
    public Binding binding003() {
        //同一个Exchange绑定了2个队列
        return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");
    }
}

RabbitTemplate

RabbitTemplate,即消息模板

  • 我们在与SpringAMQP整合的时候进行发送消息的关键词
  • 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进行注入到Spring容器中,然后直接使用
  • 在与SPring整合时需要实例化,但是在与SpringBoot整合时,在配置文件里添加配置即可

在RabbitMQConfig类中写RabbitTemplate配置

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    return rabbitTemplate;
}

RabbitTemplate的使用:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage(){
        //1、创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc","信息描述");
        messageProperties.getHeaders().put("type","自定义信息类型");
        Message message = new Message("hello rabbitmq".getBytes(),messageProperties);

        //2、发送消息
        rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
            //MessagePostProcessor 在消息发送完毕后再做一次转换进行再加工,匿名接口,需要重写方法
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.err.println("------添加额外的设置---------");
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    }

    @Test
    public void testSendMessage2(){
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("mq 消息1234".getBytes(), messageProperties);

        rabbitTemplate.send("topic001", "spring.abc", message);

        rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
        rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
    }
}

SimpleMessageListenerContainer

简单消息监听容器

  • 这个类非常的强大,我们可以对它进行很多设置,对于消费者的配置项,这个类都可以满足
  • 监听队列(多个队列)、自动启动、自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务容器(并发)、是否开启事务、回滚消息等
  • 设置消费者数量、最小最大数量、批量消费
  • 设置消息确认和自动确认模式、是否重回队列、异常捕捉handler函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等等

注意:

  • SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等
  • 很多机于RabbitMQ的自制定话后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大

声明和设置SimpleMessageListenerContainer

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    //添加多个队列进行监听
    container.setQueues(queue001(),queue002(),queue003(),queue_image(),queue_pdf());
    //当前消费者数量
    container.setConcurrentConsumers(1);
    //最大消费者数量
    container.setMaxConcurrentConsumers(5);
    //设置重回队列,一般设置false
    container.setDefaultRequeueRejected(false);
    //设置自动签收机制
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    //设置listener外露
    container.setExposeListenerChannel(true);
    //消费端标签生成策略
    container.setConsumerTagStrategy(new ConsumerTagStrategy() {
        @Override
        public String createConsumerTag(String queue) {
            //每个消费端都有自己独立的标签
            return queue + "_" + UUID.randomUUID().toString();
        }
    });

    //消息监听
    container.setMessageListener(new ChannelAwareMessageListener() {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            String msg = new String(message.getBody());
            System.err.println("----------消费者: " + msg);
        }
    });

    return container;
}

MessageListenerAdapter

MessageListenerAdapter 即消息监听适配器

适配器使用方式1
我们把之前的消息监听代码注释,可以不用直接加消息监听,而是采用MessageListenerAdapter的方式,通过适配器方式1,我们来学习下如何使用默认的handleMessage,自定义方法名,自定义转换器。
使用默认handleMessage

//消息监听
/*container.setMessageListener(new ChannelAwareMessageListener() {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.err.println("----------消费者: " + msg);
    }
});*/

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
container.setMessageListener(adapter);

MessageListenerAdapter 适配器类,熟悉适配器模式的朋友肯定了解适配器模式的话,可以通过适配器,适配自己的实现,这里我们适配自定义的MessageDelegate类。我们就可以不采用监听的方式,采用适配的方式。
自定义MessageDelegate

public class MessageDelegate {

    public void handleMessage(byte[] messageBody) {
        System.err.println("默认方法, 消息内容:" + new String(messageBody));
    }
}

MessageDelegate类中,方法名与参数handleMessage(byte[] messageBody)是固定的。为什么呢?
MessageListenerAdapter源码分析:

public class MessageListenerAdapter extends AbstractAdaptableMessageListener {
    private final Map<String, String> queueOrTagToMethodName;
    public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
    private Object delegate;
    private String defaultListenerMethod;

    public MessageListenerAdapter() {
        this.queueOrTagToMethodName = new HashMap();
        //默认将handleMessage设置给defaultListenerMethod
        this.defaultListenerMethod = "handleMessage";
        this.delegate = this;
    }

    public MessageListenerAdapter(Object delegate) {
        this.queueOrTagToMethodName = new HashMap();
        //默认将handleMessage设置给defaultListenerMethod
        this.defaultListenerMethod = "handleMessage";
        this.doSetDelegate(delegate);
    }

    public MessageListenerAdapter(Object delegate, MessageConverter messageConverter) {
        this.queueOrTagToMethodName = new HashMap();
        //默认将handleMessage设置给defaultListenerMethod
        this.defaultListenerMethod = "handleMessage";
        this.doSetDelegate(delegate);
        super.setMessageConverter(messageConverter);
    }

    public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
        this(delegate);
        //这里支持自定义设置defaultListenerMethod
        this.defaultListenerMethod = defaultListenerMethod;
    }
}

默认方法名就是叫handleMessage。当然也可以自己去指定设置。通过messageListenerAdapter的代码我们可以看出如下核心属性:

  • defaultListenerMethod默认监听方法名称:用于设置监听方法名称
  • Delegate 委托对象:实际真实的委托对象,用于处理消息
  • queueOrTagToMethodName 队列标识与方法名称组成集合
  • 可以一一进行队列与方法名称的匹配
  • 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理

自定义方法名:

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
container.setMessageListener(adapter);

修改MessageDelegate()类:

public class MessageDelegate {
    
    public void consumeMessage(byte[] messageBody) {
        System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
    }
}

自定义TextMessageConverter转换器:

public class TextMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if(null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }

}

修改RabbitMQConfig类:

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);

修改MessageDelegate类:

public class MessageDelegate {

    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息内容:" + messageBody);
    }
}

适配器使用方式2

  • 适配器方式: 我们的队列名称和方法名称也可以进行一一的匹配
/**
 * 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
 */
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
return container;
public class MessageDelegate {
    public void method1(String messageBody) {
        System.err.println("method1 收到消息内容:" + new String(messageBody));
    }
    
    public void method2(String messageBody) {
        System.err.println("method2 收到消息内容:" + new String(messageBody));
    }
    
}

MessageConverter消息转换器

我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter

  • 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口
  • 重写下面两个方法:
    • toMessage:java对象转换为Message
    • fromMessage:Message对象转换为java对象
  • Json转换器:Jackson2JsonMessageConverter:可以进行Java对象的转换功能
  • DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系
  • 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体

其实我们在介绍MessageListenerAdapter的时候,中间就介绍到了TextMessageConverter转换器,将二进制数据转换成字符串数据。

添加json格式的转换器,修改RabbitMQConfig类

// 支持json格式的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//重点,加入json格式的转换器 json对应Map对象
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);

container.setMessageListener(adapter);

修改MessageDelegate

public class MessageDelegate {
    //json对应Map对象
    public void consumeMessage(Map messageBody) {
        System.err.println("map方法, 消息内容:" + messageBody);
    }
}

定义一个Order对象

public class Order {
    private String id;
    private String name;
    private String content;
    ...省略get/set等方法
}

定义测试方法:

@Test
public void testSendJsonMessage() throws Exception {

    Order order = new Order();
    order.setId("001");
    order.setName("消息订单");
    order.setContent("描述信息");
    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(order);
    System.err.println("order 4 json: " + json);

    MessageProperties messageProperties = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties.setContentType("application/json");
    Message message = new Message(json.getBytes(), messageProperties);

    rabbitTemplate.send("topic001", "spring.order", message);
}

添加支持Java对象转换,修改RabbitMQConfig类

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");

Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();

//需要将javaTypeMapper放入到Jackson2JsonMessageConverter对象中
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);

adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);

修改MessageDelegate

public class MessageDelegate {
    public void consumeMessage(Order order) {
        System.err.println("order对象, 消息内容, id: " + order.getId() + 
                ", name: " + order.getName() + 
                ", content: "+ order.getContent());
    }
}

定义测试方法

@Test
public void testSendJavaMessage() throws Exception {

    Order order = new Order();
    order.setId("001");
    order.setName("订单消息");
    order.setContent("订单描述信息");
    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(order);
    System.err.println("order 4 json: " + json);

    MessageProperties messageProperties = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties.setContentType("application/json");
    //添加typeid 与类的全路径
    messageProperties.getHeaders().put("__TypeId__", "com.cp.spring.entity.Order");
    Message message = new Message(json.getBytes(), messageProperties);

    rabbitTemplate.send("topic001", "spring.order", message);
}

添加支持java对象多映射转换,修改RabbitMQConfig类

//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();

//key表示标签 对应一个类的具体全路径。类和标签绑定之后,标签是order,意思就是转换成order类
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.cp.spring.entity.Order.class);
idClassMapping.put("packaged", com.cp.spring.entity.Packaged.class);

javaTypeMapper.setIdClassMapping(idClassMapping);
//一层套一层
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);

修改MessageDelegate

public class MessageDelegate {
    //json对应Map对象
    public void consumeMessage(Order order) {
        System.err.println("order对象, 消息内容, id: " + order.getId() + 
                ", name: " + order.getName() + 
                ", content: "+ order.getContent());
    }
     
    public void consumeMessage(Packaged pack) {
        System.err.println("package对象, 消息内容, id: " + pack.getId() + 
                ", name: " + pack.getName() + 
                ", content: "+ pack.getDescription());
    }
}

定义一个Packaged对象

public class Packaged {
    private String id;
    private String name;
    private String description;
    ...省略get/set等方法
}

定义测试方法

@Test
public void testSendMappingMessage() throws Exception {

    ObjectMapper mapper = new ObjectMapper();

    Order order = new Order();
    order.setId("001");
    order.setName("订单消息");
    order.setContent("订单描述信息");

    String json1 = mapper.writeValueAsString(order);
    System.err.println("order 4 json: " + json1);

    MessageProperties messageProperties1 = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties1.setContentType("application/json");
    //设置的是标签,而不是全路径
    messageProperties1.getHeaders().put("__TypeId__", "order");
    Message message1 = new Message(json1.getBytes(), messageProperties1);
    rabbitTemplate.send("topic001", "spring.order", message1);

    Packaged pack = new Packaged();
    pack.setId("002");
    pack.setName("包裹消息");
    pack.setDescription("包裹描述信息");

    String json2 = mapper.writeValueAsString(pack);
    System.err.println("pack 4 json: " + json2);

    MessageProperties messageProperties2 = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties2.setContentType("application/json");
    //设置的是标签,而不是全路径
    messageProperties2.getHeaders().put("__TypeId__", "packaged");
    Message message2 = new Message(json2.getBytes(), messageProperties2);
    rabbitTemplate.send("topic001", "spring.pack", message2);
}

在通过单元测试运行testSendMappingMessage()方法时会存在一个问题:委派对象MessageDelegate可能会收不到对象。
因为单元测试spring容器在运行完毕之后就停止,不会等到消费者消费完消息之后再停止,所以需要通过正常启动springboot项目,可以看到正常消费消息。

添加全局转换器,修改RabbitMQConfig类

@Bean  
public Queue queue_image() {  
    return new Queue("image_queue", true); //队列持久  
}

@Bean  
public Queue queue_pdf() {  
    return new Queue("pdf_queue", true); //队列持久  
}

//1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");

//全局的转换器:所有小的Converter都可以放到这个大的Converter中
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();

TextMessageConverter textConvert = new TextMessageConverter();
//text走文本转换器
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
//json走json转换器
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
//图片走图片转换器
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
//pdf走pdf转换器
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);


adapter.setMessageConverter(convert);
container.setMessageListener(adapter);

修改MessageDelegate

public class MessageDelegate {

    public void handleMessage(byte[] messageBody) {
        System.err.println("默认方法, 消息内容:" + new String(messageBody));
    }
    
    public void consumeMessage(byte[] messageBody) {
        System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
    }
    
    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息内容:" + messageBody);
    }
    
    public void method1(String messageBody) {
        System.err.println("method1 收到消息内容:" + new String(messageBody));
    }
    
    public void method2(String messageBody) {
        System.err.println("method2 收到消息内容:" + new String(messageBody));
    }
    
    //json对应Map对象
    public void consumeMessage(Map messageBody) {
        System.err.println("map方法, 消息内容:" + messageBody);
    }
    public void consumeMessage(Order order) {
        System.err.println("order对象, 消息内容, id: " + order.getId() + 
                ", name: " + order.getName() + 
                ", content: "+ order.getContent());
    }
    public void consumeMessage(Packaged pack) {
        System.err.println("package对象, 消息内容, id: " + pack.getId() + 
                ", name: " + pack.getName() + 
                ", content: "+ pack.getDescription());
    }
    public void consumeMessage(File file) {
        System.err.println("文件对象 方法, 消息内容:" + file.getName());
    }
}

添加PDFMessageConverter

public class PDFMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("-----------PDF MessageConverter----------");
        
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String path = "d:/010_test/" + fileName + ".pdf";
        File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }
}

添加ImageMessageConverter

public class ImageMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("-----------Image MessageConverter----------");
        
        Object _extName = message.getMessageProperties().getHeaders().get("extName");
        String extName = _extName == null ? "png" : _extName.toString();
        
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        //将接受到的图片放到该位置
        String path = "d:/010_test/" + fileName + "." + extName;
        File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }
}

定义测试方法:

@Test
public void testSendExtConverterMessage() throws Exception {
//      byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
//      MessageProperties messageProperties = new MessageProperties();
//      messageProperties.setContentType("image/png");
//      messageProperties.getHeaders().put("extName", "png");
//      Message message = new Message(body, messageProperties);
//      rabbitTemplate.send("", "image_queue", message);

        byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/pdf");
        Message message = new Message(body, messageProperties);
        rabbitTemplate.send("", "pdf_queue", message);
}

RabbitMQ整合Spring Boot

生产端核心配置:

  • publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback

  • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback

注意:

  • 在发送消息的时候,对template进行配置mandatory=true,保证监听有效
  • 生产端还可以配置其他属性,如发送重试,超时时间、次数、间隔等。

生产者SpringBoot配置:

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 消息发送到交换机确认机制,是否确认回调
spring.rabbitmq.publisher-confirms=true
# 消息发送到交换机确认机制,是否返回回调
spring.rabbitmq.publisher-returns=true
# 需设置mandatory=true,否则不回return回调,消息就丢了
spring.rabbitmq.template.mandatory=true

生产者代码封装:

@Component
@Slf4j
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //回调函数: confirm确认
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                //消息可靠性投递,消息发送成功之后的处理
            } else {
                log.info("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                log.info("异常处理......");
                //消息可靠性投递,消息发送失败之后的处理
            }
        }
    };

    //回调函数: return返回
    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
                                        String replyText, String exchange, String routingKey) {
            log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:({})", exchange, routingKey, replyCode, replyText, message);
            log.info("补偿操作,重试等");
            //exchange路由消息到queue失败,则回调return,消息丢失
            //消息可靠性投递,消息丢失之后的处理
        }
    };

    /**
     * 发送消息方法调用: 构建Message消息
     * @param message
     * @param properties
     */
    public void send(Object message,Map<String,Object> properties){
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message,messageHeaders);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        CorrelationData correlationData = new CorrelationData();
        //id + 时间戳 全局唯一,这里使用uuid演示
        correlationData.setId(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg,correlationData);
    }

    /**
     * 发送消息方法调用: 构建自定义对象消息
     * @param order
     */
    public void sendOrder(Order order){
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        CorrelationData correlationData = new CorrelationData();
        //id + 时间戳 全局唯一,这里使用uuid演示
        correlationData.setId(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", order,correlationData);
    }

}

消费端核心配置:

# manual:手动 ack(确认)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 消费者的最小数量
spring.rabbitmq.listener.simple.concurrency=5
# 消费者的最大数量
spring.rabbitmq.listener.simple.max-concurrency=10

首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理。

可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况。

@RabbitListener注解使用

  • 消费端监听@RabbitListener注解,这个对于在实际工作中非常的好用
  • @RabbitListener是一个组合注解,里面可以注解配置
  • @QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并配置监听功能等。
    代码示例:
public class Consumer {

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(value = "queue-1",durable = "true"),
            exchange = @Exchange(value = "exchange-1",
                    durable = "true",
                    type = "topic",
                    ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
    ))
    @RabbitHandler
    public void onMessage(Message message, Channel channel){

    }
}

提示:由于 @RabbitListener配置写在代码里面非常不友好,所以强烈建议大家使用配置文件配置

消费者SpringBoot配置:

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# manual:手动 ack(确认)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 消费者的最小数量
spring.rabbitmq.listener.simple.concurrency=5
# 消费者的最大数量
spring.rabbitmq.listener.simple.max-concurrency=10

spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*

消费者代码示例

@Component
@Slf4j
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(value = "queue-1",durable = "true"),
            exchange = @Exchange(value = "exchange-1",
                    durable = "true",
                    type = "topic",
                    ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
    ))
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws IOException {
        log.info("消费端收到消息体,message({})",message.getPayload().toString());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag,false);
    }

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
                    durable = "${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
                    durable = "${spring.rabbitmq.listener.order.exchange.durable}",
                    type = "${}spring.rabbitmq.listener.order.exchange.type",
                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
    ))
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String,Object> headers) throws IOException {
        log.info("消费端收到消息体,order({})",order);
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag,false);
    }
}

RabbitMQ整合Spring Cloud Stream

Spring Cloud,这个全家桶框架在整个中小型互联网公司异常火爆,因为Spring Cloud 有一套完善额微服务架构体系,那么相对应着Spring Cloud Stream就渐渐的被大家所重视起来,

Spring Cloud Stream整体架构图
Spring Cloud Stream整合RabbitMQ

Spring Cloud Stream通道接口定义:

  • Processor接口:Processor接口是定义作为后面类的参数,这一接口定义通道类型和通道名称,通道名称作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
  • @Output:输出注解,用于定义发送消息接口
  • @Input:输入注解,用于定义消息的消费者接口
  • @StreamListener:用于定义监听方法的注解

注意:

  • 使用Spring Cloud Stream非常简单,只需要使用好上面三个注解即可,在实现高性能消息的生产和消费的场景非常适合,但是使用Spring Cloud Stream框架有一个非常大的问题就是不能实现可靠性消息的投递,也就是没有办法保证消息的100%可靠性,会存在少量消息的丢失。
  • 这个原因是因为Spring Cloud Stream框架为了和Kafka兼顾,所以在实际工作中使用它的目的就是针对高性能的消息通信的,这点就是当前Spring Cloud Stream的定位

代码演示:
引入maven依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.10.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-autoconfigure</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <!--整合Spring Cloud-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Greenwich.SR3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Producer端代配置和代码:

server.port=8001

spring.application.name=producer
spring.cloud.stream.bindings.output_channel.destination=exchange-3
spring.cloud.stream.bindings.output_channel.group=queue-3
spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster

spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=127.0.0.1:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
/**
 * 这里的Processor接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
 * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
 */

@Component
public interface Processor {

    //String INPUT_CHANNEL = "input_channel";
    String OUTPUT_CHANNEL = "output_channel";

    //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,
    // 表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。
    @Output(Processor.OUTPUT_CHANNEL)
    MessageChannel logoutput();
}

@EnableBinding(Processor.class)
@Service
@Slf4j
public class RabbitmqSender {

    @Autowired
    private Processor processor;

    public void sendMessage(Object message,Map<String, Object> properties){
        try {
            MessageHeaders messageHeaders = new MessageHeaders(properties);
            Message<Object> msg = MessageBuilder.createMessage(message, messageHeaders);
            boolean sendStatus = processor.logoutput().send(msg);
            log.info("--------------sending -------------------");
            log.info("发送数据:" + message + ",sendStatus: " + sendStatus);
        } catch (Exception e) {
            log.error("-------------error-------------,e({})",e);
            throw new RuntimeException(e.getMessage());
        }
    }
}

Consumer端代配置和代码:

server.port=8002

spring.application.name=consumer
spring.cloud.stream.bindings.input_channel.destination=exchange-3
spring.cloud.stream.bindings.input_channel.group=queue-3
spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5

spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=127.0.0.1:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
/**
 * 这里的Processor接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
 * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
 */

@Component
public interface Processor {

    String INPUT_CHANNEL = "input_channel";

    //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。
    //这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,
    //它的类型是input,订阅的主题是position2处声明的mydest这个主题
    @Input(Processor.INPUT_CHANNEL)
    SubscribableChannel loginput();
}

@EnableBinding(Processor.class)
@Service
@Slf4j
public class RabbitmqReceiver {

    @StreamListener(Processor.INPUT_CHANNEL)
    public void receiver(Message message) throws Exception {
        Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        System.out.println("Input Stream 1 接受数据:" + message);
        System.out.println("消费完毕------------");
        channel.basicAck(deliveryTag, false);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容