SpringBoot RabbitMQ

RabbitMQ 介绍

RabbitMQ的流程是:生产者将消息发送到对应交换机上,交换机再将消息转发到绑定的队列上,消费者从绑定的队列获取消息进行消费。


image.png

交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

SpringBoot整合RabbitMQ

添加依赖:

<!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

在application.yml文件中添加配置

spring:
  application:
    name: async-task
  rabbitmq:
    host: 192.168.255.255
    port: 5672
    username: xxx
    password: 123456

Direct模式

Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个路由键(RoutingKey).当发送者发送消息的时候,指定对应的Key.当Key和消息队列的RoutingKey一致的时候,消息将会被发送到该消息队列中.

@Configuration
public class RabbitMQConfig{
    // 交换机有四种类型,分别为Direct,topic,headers,Fanout.

    // Direct 模式创建队列
    // 创建队列
    @Bean
    public Queue testQueue() {
        return new Queue("queueName");
    }

    // 创建一个交换机
    @Bean
    public DirectExchange testExchange() {
        return new DirectExchange("exchangeName");
    }

    // 把队列和交换机绑定在一起
    @Bean
    public Binding testBinding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("routingKey");
    }
}

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    template.convertAndSend("exchangeName", "routingKey", "hello,rabbit~");
    }
}

// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "queueName")
    public void process(String data){
        log.info("------------data: {}",data);
    }
}

AmqpTemplate 发送的消息数据还可以是对象,但对象必须序列化

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    User user = new User("name","password");
    template.convertAndSend("exchangeName", "routingKey",user);
    }
}


// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "queueName")
    public void process(User user){
        log.info("------------user: {}",user);
    }

@RabbitListener 可以作用在类上,需要和 @RabbitHandler 配合使用

@Component
@RabbitListener(queues = "queueName")
public class TestConsumer {

    @RabbitHandler
    public void process(String data){
        log.info("------------data: {}",data);
    }

    @RabbitHandler
    public void process(UserInfo userInfo){
        log.info("------------userInfo: {}",userInfo);
    }
    
    public void test(){
        log.info("------------");
    }
}

上面的TestConsumer 消费者接收所有路由键为 routingKey 的消息,队列中的消息会转发到被@RabbitHandler修饰的方法然后被消费,不同的消息类型被转发到对应的方法中。test()方法不会消费消息
RabbitMq 服务启动后会创建一个默认的DirectExchange,这个交换机只接收 路由键routingKey 和 队列名称相同的消息,所以direct模式可以简化:

@Configuration
public class RabbitMQConfig{
    // Direct 模式创建队列
    // 创建队列
    @Bean
    public Queue testQueue() {
        return new Queue("routingKey");  // 队列名和routingKey相同
    }
}

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    template.convertAndSend("routingKey", "hello,rabbit~"); // 没有交换机名称,消息会被发送到默认交换机,然后被转发到 名称和routingKey相同的队列上
    }
}

// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "routingKey")
    public void process(String data){
        log.info("------------data: {}",data);
    }
}

Topic模式

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.
通配符:* 表示一个词,# 表示零个或多个词
注意: 通配符是针对交换机的!!!也就是说消息进入交换机时才进行通配符匹配,匹配完了以后才进入固定的队列

@Configuration
public class RabbitMQConfig{

   // 交换机有四种类型,分别为Direct,topic,headers,Fanout.

   // topit 模式

   @Bean(name="queueName1")
   public Queue queueMessage1() {
       return new Queue("queueName1");      // 定义第一个队列,名称为 queueName1
   }
   @Bean(name="queueName2")
   public Queue queueMessage2() {
       return new Queue("queueName2");     // 定义第二个队列,名称为 queueName2
   }
   @Bean
   public TopicExchange exchange() {
       return new TopicExchange("exchangeName");  // 定义交换机
   }
   // 定义绑定关系,通过交换机 将名称为queueName1 的队列绑定到交换机上, routingKey 为 topic.key1
   @Bean
   public Binding bindingExchangeMessage(@Qualifier("queueName1") Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("topic.key1");
   }
   // 定义另一个绑定关系,通过交换机 将名称为queueName2 的队列绑定到交换机上 ,routingKey 是符合 通配符topic.#  的路由键
   // 如:topic.xx、topic.yy 等
   @Bean
   public Binding bindingExchangeMessages(@Qualifier("queueName2") Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
   }

消费者

    @RabbitListener(queues="queueName1")    //监听器监听指定的Queue
    public void process1(String str) {    
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="queueName2")    //监听器监听指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }

然后发送消息

// 2个消费者都会收到消息
template.convertAndSend("exchangeName", "topic.key1", "data info");
// 只有第2个消费者收到消息
template.convertAndSend("exchangeName", "topic.key2", "data info");
// 只有第2个消费者收到消息
template.convertAndSend("exchangeName", "topic.key3", "data info");

Fanout模式

fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.
因此我们发送到交换机的消息会使得绑定到该交换机的每一个Queue接收到消息,这个时候就算指定了路由键(routingKey),或者规则(即上文中convertAndSend方法的参数2),也会被忽略!

// fanout模式
    @Bean(name="Amessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }


    @Bean(name="Bmessage")
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean(name="Cmessage")
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");//配置广播路由器
    }

    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

然后发送消息

template.convertAndSend("fanoutExchange", "", "data info"); // 第二个参数会被忽略

消费者

    @RabbitListener(queues="fanout.A")
    public void processA(String str1) {
        System.out.println("ReceiveA:"+str1);
    }
    @RabbitListener(queues="fanout.B")
    public void processB(String str) {
        System.out.println("ReceiveB:"+str);
    }
    @RabbitListener(queues="fanout.C")
    public void processC(String str) {
        System.out.println("ReceiveC:"+str);
    }

结果三个都收到消息


image.png

RabbitMQ 实现延迟队列

rabbitMQ可以通过死信机制来实现延迟队列的功能,一些概念:
1、TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间
2、Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信
3、Dead Letter Exchanges(DLX),即死信交换机
4、Dead Letter Routing Key (DLK),死信路由键
直接上代码:

@Configuration
public class RabbitMQConfig { 

    // 创建一个立即消费队列
    @Bean(QueueName.ImmediateQueue)
    public Queue immediateQueue() {
        return new Queue(QueueName.ImmediateQueue);
    }

    @Bean(ExchangeName.IMMEDIATE)
    public DirectExchange immediateExchange() {
        return new DirectExchange(ExchangeName.IMMEDIATE);
    }

    // 把 立即消费的队列 和 立即消费的exchange 绑定在一起
    @Bean
    public Binding immediateBinding(@Qualifier(QueueName.ImmediateQueue) Queue queue, @Qualifier(ExchangeName.IMMEDIATE) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RoutingKey.IMMEDIATE_ROUTING_KEY);
    }


    // 创建一个延时队列
    @Bean(QueueName.DelayQueue)
    public Queue delayQueue() {
        Map<String, Object> params = new HashMap<>();

        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", ExchangeName.IMMEDIATE);

        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key
        params.put("x-dead-letter-routing-key", RoutingKey.IMMEDIATE_ROUTING_KEY);

        // 设置队列中消息的过期时间,单位 毫秒
        params.put("x-message-ttl", 5 * 1000);

        return new Queue(QueueName.DelayQueue, true, false, false, params);
    }

    @Bean(ExchangeName.DELAY)
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(ExchangeName.DEAD_LETTER);
    }

    // 把 延迟消费的队列 和 延迟消费的exchange 绑定在一起
    @Bean
    public Binding delayBinding(@Qualifier(QueueName.DelayQueue) Queue queue, @Qualifier(ExchangeName.DELAY) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RoutingKey.DELAY_KEY);
    }
}

@Component
public class TestConsumer {
    @RabbitListener(queues = QueueName.ImmediateQueue)
    public void process2(UserInfo userInfo){
        log.info("------------userInfo: {}",userInfo);
    }
}

// 常量
public interface ExchangeName {
    String IMMEDIATE = "immediate";
    String DELAY = "delay";
}
public interface QueueName {
    String ImmediateQueue = "ImmediateQueue";
    String DelayQueue = "DelayQueue";
}
public interface RoutingKey {
    String IMMEDIATE_ROUTING_KEY = "immediate.key";
    String DELAY_KEY = "delay.key";
}
image.png

过程:
1、先创建一个普通队列,即上面的 ImmediateQueue,创建一个普通交换机 immediateExchange,绑定两者。
2、创建一个延迟队列,即创建时设置了参数:x-dead-letter-exchange,x-dead-letter-routing-key,x-message-ttl,该队列就相当于是一个延迟队列了
3、创建延迟交换机(其实也是普通交换机),和延迟队列绑定
4、给ImmediateQueue创建监听消费者,注意,延迟队列不要设置监听消费者,不然延迟队列就变成普通队列了,不起作用
到此延迟队列已完成,直接发送消息到延迟交换机即可

        UserInfo userInfo = new UserInfo();
        userInfo.setPhone("15800000000");
        userInfo.setUserName("aaaaaaa");

        log.info("开始发送消息");
        template.convertAndSend(ExchangeName.DELAY, RoutingKey.DELAY_KEY, userInfo);

原理:发送消息到延迟交换机,延迟交换机将消息转发到延迟队列,因为延迟队列没有监听消费者,所以消息不会被消费,直到消息超过存活时间(即 延迟)变成死信,这时延迟队列会将死信转发到死信交换机,即上面的immediateExchange(因为延迟队列绑定的死信交换机x-dead-letter-exchange指向了immediateExchange),immediateExchange将消息转发给ImmediateQueue,然后被监听消费者消费

测试结果:


image.png

可以看出过了5秒才消费消息

注意: 一个延迟队列只能设置一个存活时间,即该延迟队列里面的所有消息的存活时间都必须一致,如果需要设置不一样的存活时间,只能再创建一个延迟队列。原因是延迟队列并不会去扫描队列里面所有消息的存活时间,只会判断队列头的第一个消息是否过期,若过期了就转发消息,否则一直等待,即使队列后面已经有消息先过期,也只能等前面的消息被转发后,该消息才被转发。

消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。
生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。
消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。
在配置文件中添加:

spring:
 application:
   name: async-task
 rabbitmq:
   host: 192.168.0.0
   port: 5672
   username: username
   password: password
   publisher-confirms: true  # 开启发送确认
   publisher-returns: true   # 开启发送失败退回
   template:
     mandatory: true
   listener:
     type: simple
     simple:
       acknowledge-mode: manual # 开启消息消费手动确认

在RabbitMQConfig 中添加如下配置

    @Bean
    public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        // 消息发送到交换器Exchange后触发回调
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //  可以进行消息入库操作
                log.info("消息唯一标识 correlationData = {}", correlationData);
                log.info("确认结果 ack = {}", ack);
                log.info("失败原因 cause = {}", cause);
            }
        });

        // 配置这个,下面的ReturnCallback 才会起作用
        template.setMandatory(true);
        // 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //  可以进行消息入库操作
                log.info("消息主体 message = {}", message);
                log.info("回复码 replyCode = {}", replyCode);
                log.info("回复描述 replyText = {}", replyText);
                log.info("交换机名字 exchange = {}", exchange);
                log.info("路由键 routingKey = {}", routingKey);
            }
        });

        return template;
    }

成功确认:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。
消费者成功处理后,调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。

失败确认:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:该消息的index。
multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
requeue:是否重新入队列。

拒绝
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index。
requeue:被拒绝的是否重新入队列。

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息

消费者消息确认代码:

@Component
public class OrderConsumer {

    @Autowired
    private TaskOrderFeign orderFeign;

    @RabbitListener(queues = QueueName.OrderCancelQueue)
    public void process(OrderDTO orderDTO, Message message, Channel channel){

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("---消费消息---------deliveryTag = {} ,  orderDTO: {}",deliveryTag ,orderDTO);

            // 取消订单
            orderDTO.setState(OrderStateEnum.DELAY_CANCEL.code);
            Response response = orderFeign.cancelOrder(orderDTO);

            //TODO 判断结果,是否需要重试

            // 成功确认消息
            channel.basicAck(deliveryTag, true);

        } catch (IOException e) {
            log.error("确认消息时抛出异常 , e = {}", PrintUtil.print(e));
            // 重新确认
            // 成功确认消息
            try {
                Thread.sleep(50);
                channel.basicAck(deliveryTag, true);
            } catch (IOException | InterruptedException e1) {
                log.error("确认消息时抛出异常 , e = {}", PrintUtil.print(e));
                // 可以考虑入库
            }

        } catch (Exception e) {

            log.error("取消订单失败 , e = {}", PrintUtil.print(e));

            try {
                // 失败确认
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException e1) {
                log.error("消息失败确认失败 , e1 = {}", PrintUtil.print(e1));
            }
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容