RabbitMQ

RabbitMQ

存在的问题:比如客户端调用商品服务添加一条数据到数据库,数据库添加成功后商品服务还必须要等待搜索服务、缓存服务都完成后才能给客户端响应结果。RabbitMQ就是为了解决这种问题。商品服务完成添加数据到数据库之后可以将其余的消息交给RabbitMQ,然后商品服务就可以先向客户端响应结果,RabbitMQ再通知搜索服务和缓存服务添加数据。

一、RabbitMQ安装

用docker-compose安装即可

version: "3.1"

services: 
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672  # 这个端口是RabbitMQ自带的图形监控界面
    volumes:
      - ./mydata/rabbitmq:/var/lib/rabbitmq
      

二、RabbitMQ架构

2.1 架构图

由生产者发送消息到一个指定Exchange中,然后指定Exchange和某个Queue的路由关系,Exchange就会将消息发送到Queue中,消费者和Virtual Host建立连接,和Queue建立Channel后,就能拿到最新的消息。

注意:一个Queue中的消息只会被一个消费者消费一次(即消费完之后消息直接出队),另一个消费者消费不到同一个Queue中的同一条消息

2.2 查看图形化界面

直接访问http://ip:15672,默认用户/密码都是guest,是一个管理员用户。

在界面的Connections可以查看客户端与Virtual Host建立的连接;在Channels可以查看建立连接后建立的管道;在Exchange可以查看已有的Exchange,如果发送消息的时候没有指定Exchange,会默认用 “/”D的Exchange;在Queues可以查看队列,队列需要我们手动创建;在Admin,可以添加其他用户,可以添加Virtual Host,可以指定哪个用户可以管理哪几个Virtual Host。

三、RabbitMQ的使用

3.1 RabbitMQ的路由策略

  1. HelloWorld
  2. WorkQueues
  3. Publish/Subscribe
  4. Routing
  5. Topics
  6. Publisher confirms

3.2 Java连接RabbitMQ

Step1:创建Maven工程,导入RabbitMQ依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

Step2:创建一个用于建立Connection的工具类

public class RabbitMQUtils {
    public static Connection getConnection {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.199.109");  // 指定ip
        factory.setPort(5672);  // 指定端口
        factory.setUsername("test");  // 指定用户名密码
        factory.setPassword("test");
        factory.setVirtualHost("/test");  // 指定当前用户管理的Virtual Host
        
        Connection connection = null;
        connection = factory.newConnection();
        
        return connection;
    }
}

Step3:测试

@Test
public void testConnection() {
    Connection connection = RabbitMQUtils.getConnection();
    connection.close();  // 在这里打个断点, 就可以在图形化界面查看到建立的连接
}

3.3 HelloWorld路由

一个生产者,创建一个Channel,一个默认的Exchange(不用手动指定),一个Queue,一个消费者。

Step1:创建生产者,创建一个Channel,发布消息到Exchange,指定路由规则

public class Productor {
    @Test
    public void publish() {
        // 1. 获取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();
        // 3. 发布消息到Exchange, 同时指定路由规则
        String msg = "一条消息";
        /**
         * 四个参数: 
            (1) 指定Exchange, 如果要使用默认的就给一个空串
            (2) 指定路由规则, 使用具体的队列名称(当使用默认Exchange的时候, 这个参数同时也是队列名)
                                          (当使用自定义Exchange的时候, 这个参数就写Exchange名)
            (3) 指定传递消息所携带的属性, 没有给就null
            (4) 指定发布的具体消息, byte[]类型
         */
        channel.basicPublish("", "HelloWorld", null, msg.getBytes());
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step2:创建消费者,创建一个Channel,创建一个Queue,并且去消费当前队列

public class Costumer {
    @Test
    public void consume() {
        // 1. 获取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();
        // 3. 声明队列
        
        /**
         * 五个参数: 
            (1) 指定队列名称
            (2) 当前队列是否持久化(只有在Queue中才能持久化, Exchange中不行)
            (3) 是否排外, 即当前队列是否只能被一个消费者消费
            (4) 如果这个队列没有消费者在消费, 则队列自动删除
            (5) 指定当前队列的其他信息
         */
        channel.queueDeclare("HelloWorld", true, false, false, null);
        
        // 4. 开启监听Queue
        // 4.1 消息回调方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelopo, 
                          AMQP.BasicProperties properties, byte[] body) throws IOException{
                System.out.println("接受到消息" + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
            }
        }
        // 4.2 开始监听
        /**
         * 三个参数: 
            (1) 指定消费哪个队列
            (2) 指定是否自动ACK(即接收到消息以后会立即告诉RabbitMQ该消息已经被消费掉)
            (3) 指定消息回调犯法(即消费者具体要做的事情)
         */
        channel.basicConsume("HelloWorld", false, consumer);
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step3:测试。注意:先启动消费者再启动生产者,不然无法发送消息到Exchange,因为发送消息到Exchange需要Queue

3.4 WorkQueues路由

一个生产者,一个默认的Exchange,一个Queue,多个消费者

Step1:创建生产者,同上

Step2:创建消费者,同上。不同的是要创建两个消费者

/**
 * 二号消费者
 */
public class Costumer2 {
    @Test
    public void consume() {
        // 1. 获取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();
        // 3. 声明队列
        channel.queueDeclare("WorkQueues", true, false, false, null);
        // 3.1 可以指定当前消费者一次能消费多少条消息, 如果不指定就是两个消费者均摊
        channel.basicQos(2);
        // 4. 开启监听Queue
        // 4.1 消息回调方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelopo, 
                          AMQP.BasicProperties properties, byte[] body) throws IOException{
                System.out.println("消费者二号接受到消息" + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
            }
        }
        // 4.2 开始监听
        channel.basicConsume("Work", false, consumer);
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

3.5 Publish/Subscribe路由

一个生产者,一个自定义的Exchange,多个Queue,多个消费者。此时两个Queue中的消息是一模一样的。

Step1:创建生产者,需要手动创建一个Exchange

public class Productor {
    @Test
    public void publish() {
        // 1. 获取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();
        // 3. 创建Exchange
        /**
         * 参数1: 指定Exchange的名称; 参数2: 指定Exchange的类型
           FANOUT - 对应Publish/Subscribe路由
           DIRECT - 对应Routing路由
           TOPIC - 对应Topics路由
         */
        channel.exchangeDeclare("pubsub_exchange", BuiltinExchangeType.FANOUT);
        
        // 4. 将自定义的Exchange和Queue绑定起来(也可以在消费者中绑定)
        //  参数3是RouteKey, Publish/Subscribe路由的RouteKey给空串即可
        channel.queueBind("pubsub_queue1", "pubsub_exchange", "");
        channel.queueBind("pubsub_queue2", "pubsub_exchange", "");
        
        // 5. 发布消息到Exchange, 同时指定路由规则
        for (int i = 0; i < 10; i++) {
            String msg = "一条消息" + i;
            channel.basicPublish("pubsub_exchange", "Work", null, msg.getBytes());
        }
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step2:创建消费者,同时指定当前消费者消费的是哪个队列

/**
 * 二号消费者
 */
public class Costumer2 {
    @Test
    public void consume() {
        // 1. 获取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();
        // 3. 声明队列
        channel.queueDeclare("pubsub_queue2", true, false, false, null);

        // 4. 开启监听Queue
        // 4.1 消息回调方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelopo, 
                          AMQP.BasicProperties properties, byte[] body) throws IOException{
                System.out.println("消费者二号接受到消息" + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
            }
        }
        // 4.2 开始监听
        channel.basicConsume("pubsub_queue2", false, consumer);
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

3.6 Routing路由

一个生产者,一自定义的Exchange,多个Queue,多个消费者,Exchange和Queue之间不是直接绑定,而是通过RouteKey绑定,就是指定哪些类型的消息发送到哪个Queue中,此时两个Queue中的消息不一定相同。

Step1:创建生产者,绑定时指定Queue对应的RouteKey

public class Productor {
    @Test
    public void publish() {
        // 1. 获取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();
        // 3. 创建Exchange
        /**
         * 参数1: 指定Exchange的名称; 参数2: 指定Exchange的类型
           FANOUT - 对应Publish/Subscribe路由
           DIRECT - 对应Routing路由
           TOPIC - 对应Topics路由
         */
        channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
        
        // 4. 将自定义的Exchange和Queue绑定起来(也可以在消费者中绑定)
        //  参数3指定每个队列对应的RouteKey
        channel.queueBind("routing_queue1", "routing_exchange", "ERROR");
        channel.queueBind("routing_queue2", "routing_exchange", "INFO");
        
        // 5. 发布消息到Exchange, 同时指定RouteKey, Exchange会根据RouteKey把消息发送到对应的Queue
        channel.basicPublish("pubsub_exchange", "ERROR", null, "error msg1".getBytes());
        channel.basicPublish("pubsub_exchange", "INFO", null, "info msg1".getBytes());
        channel.basicPublish("pubsub_exchange", "INFO", null, "info msg2".getBytes());
        channel.basicPublish("pubsub_exchange", "INFO", null, "info msg3".getBytes());
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step2:创建消费者,同3.5

3.7 Topics路由

同Routing路由方式,只是RouteKey的形式不同

Step1:创建生产者

public class Productor {
    @Test
    public void publish() {
        // 1. 获取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();
        // 3. 创建Exchange
        /**
         * 参数1: 指定Exchange的名称; 参数2: 指定Exchange的类型
           FANOUT - 对应Publish/Subscribe路由
           DIRECT - 对应Routing路由
           TOPIC - 对应Topics路由
         */
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        // 4. 将自定义的Exchange和Queue绑定起来(也可以在消费者中绑定)
        //  参数3指定每个队列对应的RouteKey
        /**
         * 关于此时的RouteKey:
         *  "*.red.*": 表示有三个条件, 只要中间的条件符合就可以将消息发送到该队列, *是占位符
         *  "fast.#": #是通配符, 效果等同于"fast.*.*"
         */
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        channel.queueBind("topics_queue2", "topics_exchange", "fast.#");
        
        // 5. 发布消息到Exchange, 同时指定RouteKey, Exchange会根据RouteKey把消息发送到对应的Queue
        channel.basicPublish("topics_exchange", "fast.red.monkey", null, "快红猴".getBytes());
        channel.basicPublish("topics_exchange", "fast.white.dog", null, "快白狗".getBytes());
        channel.basicPublish("topics_exchange", "slow.red.cat", null, "快红猴".getBytes());
        // 此时第1条、第3条消息会发送给topics_queue1
        // 第1条、第2条消息会发送给topics_queue2
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

四、SpringBoot整合RabbitMQ

4.1 快速入门

Step1:创建SpringBoot工程,导入SpringBoot整合RabbitMQ的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Step2:编写配置

spring:
  rabbtimq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test

Step3:编写配置类,声明Exchange和Queue,并且绑定

@Configuration
public class RabbitMQConfig {
    // 创建Topic路由的Exchange
    @Bean("TOPIC_EXCHANGE")
    public TopicExchange getTopicExchange() {
        /**
         * 参数1: 指定Exchange的名称
         * 参数2: 是否持久化
         * 参数3: 是否自动删除
         */
        return new TopicExchange("boot_topic_exchange", true, false);
    }
    
    // 创建Queue1
    @Bean("TOPIC_QUEUE1")
    public Queue getQueue1() {
        /**
         * 参数1: 指定Queue名称
         * 参数2: 是否持久化
         * 参数3: 是否排外
         * 参数4: 是否自动删除
         * 参数5: 携带的参数
         */
        return new Queue("boot_topic_queue1", true, false, false, null);
    }
    
    // 创建Queue2
    @Bean("TOPIC_QUEUE2")
    public Queue getQueue2() {
        return new Queue("boot_topic_queue2", true, false, false, null);
    }
    
    // 绑定
    @Bean
    public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
                              @Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
        // 指定Queue, 指定Exchange, 指定RouteKey
        return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
    }
    
    // 绑定
    @Bean
    public Binding getBindingSlow(@Qualifier("TOPIC_QUEUE2") Queue queue,
                              @Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
        // 指定Queue, 指定Exchange, 指定RouteKey
        return BindingBuilder.bind(queue).to(topicExchange).with("slow.*.*");
    }
}

Step4:准备生产者,发送消息

public class Productor {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void publish() {
        /**
         * 参数1: 指定发送到的Exchange名称
         * 参数2: 指定此次发送的RouteKey
         * 参数3: 具体消息
         */
        rabbitTemplate.convertAndSend("boot_topic_exchange", "fast.red.dog", "快红狗");
        rabbitTemplate.convertAndSend("boot_topic_exchange", "slow.white.pig", "慢白猪");
        System.in.read();
    }
}

Step5:准备消费者

@Component
public class Costumer {
    // 指定当前消费者监听的队列, SpringBoot工程启动后就会一直监听
    @RabbitListener(queues = "boot_topic_queue1")
    public void consumer1(Object msg) {
        // 该方法内是消费者接收到消息之后的具体动作
        System.out.println("消费者1号接收到消息" + msg);
    }
    
    @RabbitListener(queues = "boot_topic_queue2")
    public void consumer2(Object msg) {
        System.out.println("消费者2号接收到消息" + msg);
    }
}

Step6:测试。启动SpringBoot工程,可以在打印中查看消费者打印的消息,也可以在监控界面中查看消息的发送和被消费的情况

4.2 在SpringBoot中手动ACK

Step1:修改配置

spring:
  rabbtimq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        ackonwledge-mode: manual  # 手动ACK配置

Step2:修改消费者的接收行为

@RabbitListener(queues = "boot_topic_queue1")
public void consumer1(String msg, Channel channel, Message message) throws IOException{
    System.out.println("消费者1号接收到消息" + msg);
    // 手动ACK
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

五、RabbitMQ的其他操作

5.1 消息的可靠性

  1. Q:如果消息已经到达了RabbitMQ,但是RabbitMQ宕机了,消息会丢失吗?

    A:不会,RabbitMQ的Queue有持久化机制

  2. Q:消费者在消费消息时,如果执行到一半,消费者宕机了怎么办?

    A:消费者中最好使用手动ACK来避免消息没消息完却宕机的情况,这样消息就还会存在队列中

  3. Q:生产者发送消息时,由于网络问题,导致消息没发送到RabbitMQ怎么办?

    A:RabbitMQ提供了事务操作和Confirm机制,可以保证生产者把消息发送到Exchange(但是事务操作效率太低,主要用Confirm机制)

5.1.1 RabbitMQ的Confirm机制

可以保证生产者把消息发送到Exchange

5.1.1.1 普通Confirm方式

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        // Step1:在消息发送之前, 开启Confirm
        channel.confirmSelect();
        
        channel.basicPublish("topics_exchange", "fast.red.monkey", null, "快红猴".getBytes());
        
        // Step2:在消息发送之后确认消息是否发送成功
        if (channel.waitForConfirms()) {
            /**
             * 消息发送成功时的处理
             */
        } else {
            /**
             * 消息发送失败时的处理
             */
        }
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.1.2 批量Confirm方式

主要用于发送多条消息时,只要有一条发送失败,则全部发送失败,抛出异常。

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        // Step1:在消息发送之前, 开启Confirm
        channel.confirmSelect();
        for (int i = 0 ; i < 100; i++) {
            String msg = "快红猴" + i;
            channel.basicPublish("topics_exchange", "fast.red.monkey", null, msg.getBytes());
        }
        
        // Step2:在消息发送之后确认消息是否发送成功
        channel.waitForConfirmsOrDie();  // 该方法不会返回布尔值, 如果有一条发送失败就会直接抛异常
            
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.1.3 异步Confirm方式

因为是异步的,所以不管生产者的消息是否发送成功,都会接着执行生产者后面的代码,单独有会有一个线程出来进行Confirm判断。效率最高,最常用。

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        // Step1:在消息发送之前, 开启Confirm
        channel.confirmSelect();
        for (int i = 0 ; i < 100; i++) {
            String msg = "快红猴" + i;
            channel.basicPublish("topics_exchange", "fast.red.monkey", null, msg.getBytes());
        }
        
        // Step2:在消息发送之后确认消息是否发送成功
        channel.addConfirmListener(new ConfirmListener {
            // 消息发送成功时的回调方法
            @Override
            public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息发送成功。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
            }
            
            // 消息发送失败时的回调方法
            @Override
            public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息发送失败。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
            }
        });
            
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.2 RabbitMQ的Return机制

Confirm机制只能保证消息到达Exchange,无法保证消息可以被Exchange分发到Queue。而Exchange是不能持久化消息的,Queue才可以持久化消息。所以可以使用Return机制保证Exchange把消息送达到指定的Queue

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        
        // Step1:在开启Confirm之前先开启Return
        channel.addReturnListener(new ReturnListener() {
            // 消息没有到达Queue时执行的回调方法
            @Override
            public void handleReturn(int replyCode, String exchange, 
                                     String routingKey, AMQP.BasicProperties properties,
                                     byte[] msg) throws Exception {
                System.out.println(new String(msg, "UTF-8") + "没有送到到Queue中");
            }
        })
        
        // Step2:在消息发送之前, 开启Confirm
        channel.confirmSelect();
        for (int i = 0 ; i < 100; i++) {
            String msg = "快红猴" + i;
            // 在发送消息时, 在RouteKey参数后面追加一个参数, 置为true表示开启Return
            channel.basicPublish("topics_exchange", "fast.red.monkey", true, null, msg.getBytes());
        }
        
        // Step3:在消息发送之后确认消息是否发送成功
        channel.addConfirmListener(new ConfirmListener {
            @Override
            public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息发送成功。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息发送失败。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
            }
        });
            
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.3 在SpringBoot中实现Confirm和Return

Step1:修改配置文件

spring:
  rabbtimq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        ackonwledge-mode: manual  # 手动ACK配置
    publisher-confirm-type: simple
    publisher-returns: true

Step2:编写配置类,指定RabbitTemplate对象,开启Confirm和Return,并编写回调方法

@Component  // 这里不能用Configuration, 因为要实现接口
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTempalte;
    
    // 定义初始化方法, 这个方法在构建对象时会执行
    @PostConstruct
    public void initMethod() {
        // 给RabbitTemplate指定Confirm和Return的回调方法
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            /**
             * 消息发送成功时的处理
             */
        } else {
            /**
             * 消息发送失败时的处理
             */
        }
    }
    
    @Override
    public void returnMessage(Message message, int replyCode, String replyText,
                              String exchange, String routingKey) {
        /**
         * Exchange中的消息没有送达到Queue时的处理
         */
    }
    
}

Step3:生产者和消费者不变

5.2 消息重复消费

重复消费同一个Queue的消息,会对非幂等行操作(增删)造成问题;重复消费消息的原因是,消费者没有给RabbitMQ一个Ack。

为了解决重复消费消息的问题,可以采用Redis,在消费者消费消息之前,先将消息的id作为key放到Redis中(用setnx方法,key不存在就创建key,key存在则获取key的value),并把对应的value置0,表示正在执行任务,等任务执行完毕之后可以把value置1。如果消费者一号ACK失败,在RabbitMQ将消息交给消费者二号时,会先执行setnx方法判断key是否存在,如果key存在则获取key的value,如果value是0,则消费者二号就什么都不做,如果value是1,则表示消费者一号已经执行完了任务,但是ACK失败,消费者二号直接执行ACK帮消费者一号ACK即可。

极端情况:消费者一号出现了死锁,则会一直卡在key存在且value=0的情况。解决方法是:在setnx设置key的时候,给key指定上一个生存时间即可。

5.2.1 实现避免消息重复消费

Step1:在Docker中启动Redis,然后在项目中导入Redis的依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.1.0</version>
</dependency>

Step2:生产者在发送消息前指定上消息的ID

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String exchange, 
                                     String routingKey, AMQP.BasicProperties properties,
                                     byte[] msg) throws Exception {
                System.out.println(new String(msg, "UTF-8") + "没有送到到Queue中");
            }
        })

        channel.confirmSelect();
        for (int i = 0 ; i < 100; i++) {
            // 创建一个properties属性
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                                            .deliveryMode(1)  // 指定消息是否需要持久化, 1表示需要持久化, 2不需要
                                            .messageId(UUID.randomUUID().toString()) // 给每一条消息随机分配了一个UUID
                                            .build();
            String msg = "快红猴" + i;
            // 将properties属性放到第4个参数就可以随着消息一起发出
            channel.basicPublish("topics_exchange", "fast.red.monkey", true, properties, msg.getBytes());
        }

        channel.addConfirmListener(new ConfirmListener {
            @Override
            public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息发送成功。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息发送失败。 标识为:" + deliveryTag + "是否是批量:" + mutiple);
            }
        });
            
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step3:消费者调用Redis进行操作

public class Costumer1 {
    @Test
    public void consume() {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("WorkQueues", true, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelopo, 
                          AMQP.BasicProperties properties, byte[] body) throws IOException{
                Jedis jedis = new Jedis("192.168.199.109", 6379);  // 连接Redis
                String messageId = properties.getMessageId();  // 取出properties中的messageId
                // 1. setnx到Redis中, 默认指定Value为0, 指定生存时间为10秒
                String result = jedis.set(messageId, "0", "NX", "EX", 10);
                if (result != null && result.equalsIgnoreCase("OK")) {
                    System.out.println("消费者一号接受到消息" + new String(body, "UTF-8"));
                    // 2. 消费成功, set messageId的value为1
                    jedis.set(messageId, "1");
                    channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
                } else {
                    // 3. 如果1中的setnx失败, 获取key对应的value
                    //    如果是0, 则直接return, 如果是1则先ACK再return
                    String value = jedis.get(messageId);
                    if ("1".equalsIgnoreCase(s)) {
                        channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
                    }
                }
            }
        }

        channel.basicConsume("topics_queue1", false, consumer);
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step4:测试,启动生产者和消费者,然后到Redis中查看是否有新的key

5.2.2 在SpringBoot实现避免消息重复消费

Step1:导入SpringBoot整合Redis的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Step2:在配置文件中连接Redis

spring:
  redis:
    host: 192.168.199.109
    port: 6379

Step3:生产者发送消息之前指定消息的ID

public class Productor {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void publish() {
        // 给消息的属性分配一个随机的UUID
        CorrelationData messageId = new CorrelationData(UUID.randomUUID.toString());
        // 把messageId放入第4个参数
        rabbitTemplate.convertAndSend("boot_topic_exchange", "fast.red.dog", "快红狗", messageId);
        System.in.read();
    }
}

Step4:消费者调用Redis进行操作

@Component
public class Costumer {
    // 用StringRedisTemplate可以直接用String存入Redis(可以不用转为字节数组)
    @Autowired
    private StringRedisTemplate redisTemplate;  
    
    @RabbitListener(queues = "boot_topic_queue1")
    public void consumer1(String msg, Channel channel, Message message) throws IOException{
        // 先取出messageId
        String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        // 1. 设置key到Redis
        if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
            System.out.println("消费者1号接收到消息" + msg);
            // 2. 消费成功, set messageId的value为1
            redisTemplate.opsForValue().set(messageId, "1");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  // 手动ACK
        } else {
            // 3. 如果1中的setnx失败, 获取key对应的value
            //    如果是0, 则直接return, 如果是1则先ACK再return
            if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  // 手动ACK
            }
        }
    }
}

Step5:测试,启动生产者和消费者,然后到Redis中查看是否有新的key

六、RabbitMQ的简单应用

例子:添加一条数据时,客户模块在MySQL添后会调用搜索模块在ES中也添加

客户模块:

Step1:在客户模块的SpringBoot工程中导入SpringBoot整合RabbitMQ的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Step2:编写配置文件连接RabbitMQ

spring:
  rabbitmq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        ackonwledge-mode: manual  # 手动ACK配置

Step3:编写配置类

@Configuration
public class RabbitMQConfig {
    // 创建Topic路由的Exchange
    @Bean("TOPIC_EXCHANGE")
    public TopicExchange getTopicExchange() {
        return new TopicExchange("openapi_customer_exchange", true, false);
    }
    
    // 创建Queue1
    @Bean("TOPIC_QUEUE1")
    public Queue getQueue1() {
        return new Queue("openapi_customer_queue", true, false, false, null);
    }
    
    // 绑定
    @Bean
    public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
                              @Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
        // 指定Queue, 指定Exchange, 指定RouteKey
        return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
    }
}

Step4:在service中直接把消息发送给Exchange(生产者)(要先注入RabbitTemplate)

// 原来的写法: 直接把消息发送到
/**
String json = JSON.toJSON(customer);
HttpHeaders headers = new HttpHeaders();
headers.setContenType(MediaType.parseMediaType("application/json;charset=utf-8"));
HttpEntity<String> entity = new HttpEntity<>(json.headers);
restTemplate.postForObject("http://localhost:8080/search/customer/add", entity, String.class);
*/

// 新的写法: 将消息发送到Exchange
rabbitTemplate.converAndSend("openapi_customer_exchange", "openapi.customer.add", JSON.toJSON(customer));

搜索模块:

Step1:在客户模块的SpringBoot工程中导入SpringBoot整合RabbitMQ的依赖

<!-- RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Step2:编写配置文件,连接RabbitMQ和Redis

spring:
  rabbtimq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        ackonwledge-mode: manual  # 手动ACK配置

Step3:编写配置类

@Configuration
public class RabbitMQConfig {
    // 创建Topic路由的Exchange
    @Bean("TOPIC_EXCHANGE")
    public TopicExchange getTopicExchange() {
        return new TopicExchange("openapi_customer_exchange", true, false);
    }
    
    // 创建Queue1
    @Bean("TOPIC_QUEUE1")
    public Queue getQueue1() {
        return new Queue("openapi_customer_queue", true, false, false, null);
    }
    
    // 绑定
    @Bean
    public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
                              @Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
        // 指定Queue, 指定Exchange, 指定RouteKey
        return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
    }
}

Step4:准备消费者

@Component
public class CostumerListener {
    @Autowired
    private CustomerService customerService;
    // 指定当前消费者监听的队列, SpringBoot工程启动后就会一直监听
    @RabbitListener(queues = "openapi_customer_queue")
    public void consumer1(String msg, Channel channel, Message message) throws IOException{
        // 1. 获取RoutingKey
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        // 2. 根据RoutingKey来选择调用的方法
        switch (receivedRoutingKey) {
            case "openapi.customer.add":
                // 3. 调用Service完成添加数据到ES(先把json字符串转为Customer对象再存)
                customerService.saveCustomer(JSON.parseJSON(msg, Customer.class));
                // 4. 手动ACK
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                break;
            /**
             * ......
             * 其他RoutingKey对应的处理
             */
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355

推荐阅读更多精彩内容