MQ(二) - RabbitMQ消息模型

1.消息模型

根据官方文档得知,RabbitMQ有七种消息模型:


image-20200420162349911.png

image-20200420162457226.png

image-20200420162430281.png

1.1 Hello World消息模型

1.1.1 介绍

image-20200420162810992.png

翻译成中文如下:

RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把你想寄出的邮件放进一个邮箱里时,你可以确信邮件的收件人最终会收到邮件。在这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。

RabbitMQ与邮局的主要区别在于,它不处理纸张,而是接受、存储和转发二进制的数据信息块。

1.1.2 模型图

image-20200420162548840.png
  • P (Produce) 生产者,主要是生产消息,以及发送消息。说白了就是一个发送消息的应用程序


    image-20200420163700439.png
  • 队列 (queue) 队列是RabbitMQ中的邮箱的名称。尽管消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中。队列只受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,而许多消费者可以尝试从一个队列接收数据
image-20200420163826815.png
  • C (consumer) 消费和接受有着相似的含义。消费者者是一个主要等待接收消息的程序


    image-20200420163923433.png

    注意:生产者、消费者和代理不必驻留在同一主机上;事实上,在大多数应用程序中,它们不必驻留在同一主机上。应用程序也可以同时是生产者和消费者。

1.1.3 代码实现

接下来我们采用java语言编写生产者程序,以及消费者程序来感受一下其魅力。

生者者:Send

消费者:Consumer

  • 生产者代码实现:

    • 模型图:


      image-20200420164345046.png
由图中可知,生产者不仅要生产消息,还要将消息发送到指定队列:
  • 创建 springboot项目(wangzh-rabbitmq)


    image-20200420164715769.png
  • 导入amqp依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • 编写工具类,用来获取连接

    package com.mq.rabbit.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class ConnectionUtil {
        public static Connection getConnection() throws Exception {
            // 1. 创建连接工厂,用来获取连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2. 设置基本信息
            // 设置rabbitmq所在地址
            connectionFactory.setHost("192.168.169.130");
            // 设置用户名,我们先前创建了一个wangzh的用户
            connectionFactory.setUsername("wangzh");
            // 设置密码
            connectionFactory.setPassword("wangzh");
            // 设置端口 这个端口是amqp协议的端口
            connectionFactory.setPort(5672);
            return connectionFactory.newConnection();
        }
    }
    
    
  • 编写发送端程序

    package com.mq.rabbit.helloworld;
    
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
        /**
         * 队列名字
         */
        private static final String QUEUE_NAME="hello_word";
    
        public static void main(String[] args) {
            try {
                // 1. 获取连接
                Connection connection = ConnectionUtil.getConnection();
                /*
                 *   2. 创建通道
                 *      生产者发送消息到队列中需要借助通道
                 */
                Channel channel = connection.createChannel();
                /**
                 *  3. (创建)声明队列
                 *    如果名字所对应的队列存在,那么就不存创建队列,而是去时使用现成对的队列
                 *    如果名字对应的对应不存在,那么就去创建队列
                 *    第一个参数: 队列的名字
                 *    第二个参数: 是否声明一个持久化队列,true表示会将消息持久化
                 *    第三个参数: 是否声明一个独占队列(创建者可以使用的私有队列,断开后自动删除), true表示声明成独占队列
                 *    第四个参数: 是否声明一个删除队列(消费者客户端连接断开时是否自动删除队列),true表示声明成删除队列
                 *    第五个参数: 队列其他参数
                 */
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
                // 消息
                String msg = "I am OK";
                /**
                 * 将消息存入队列中
                 *  第一个参数:使交换机的名字 我们后面再将交换机
                 *  第二个参数:队列映射的路由key,我们后面再讲
                 *  第三个参数: 队列消息其他属性
                 *  第四个参数: 发送消息的主体
                */
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    
                System.out.println("发送成功");
                // 关闭资源
                channel.close();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    
  • 执行程序


    image-20200421093625708.png
  • 查看管理页面


    image-20200421093724551.png
image-20200421093801454.png
image-20200421094139819.png

通过上图我们可以看到当生产者发送消息到队列中时,管理界面就能看到这个队列,以及队列里面的消息数。

注意:我们只是在控制台看到消息,并不会去消费这个消息。

  • 编写消费者

    package com.mq.rabbit.helloworld;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer {
        private static final String QUEUE_NAME="hello_word";
    
        public static void main(String[] args) throws Exception {
            // 1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.创建通道,消费者从队列中获取消息也是借助通道
            Channel channel = connection.createChannel();
            /*
             *   3.声明队列
             *      如果队列不存在就会创建队列
             *      由于我们在生产者者那边已经创建好了队列
             *      那么消费者这边就不会创建队列
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            /*
             *  4. 监听队列,如果队列中有消息,就直接拿过来
             *      第一个参数:队列名字
             *      第二个参数:是否进行消息自动确认,后面我们讲ack参数时再说
             *      第三个参数:回调对象,从队列中主动获取消息
             */
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
                /*
                 * consumerTag:消费者标签与消费者相关
                 * envelope:消息的打包数据
                 * properties:消息的头部数据
                 * body:消息主体
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(consumerTag);
                    System.out.println(envelope);
                    System.out.println(properties);
                    System.out.println("消费的消息:" + new String(body));
                }
            });
        }
    }
    
  • 执行结果


    image-20200421101003782.png

    image-20200421101051070.png

由上图可知:当消息被消费后,队列里面就没有这一条消息了。同时消费者的应用程序并没有停止,而是一致在运行着,一致在监听队列。

自此我们一个简单的Helloword消息模型就写完了

1.1.4 ACK 机制

我们来思考一下有没有上述的例子什么问题???

1.消费者当消费消息后,MQ就会把队列中的消息删除,那么MQ怎么就知道消息被消费了呢?

2.当消费者领取消息后,还没有消费就挂掉了,或者是发生异常,那么MQ就无法得知消息有没有被消费掉。

为了解决上述问题,RabbitMQ提供了一个消息确认机制(ACK机制),当消费者把队列中的消息消费以后,会向Rabbi发送一个ACK,告诉MQ消息已经被消费了,你可以把消息删除了。

不过这种发送ACK有两种方式:

  • 自动发送ACK:消息一旦被接收,自动向MQ发送ACK

    • 代码实现:

      image-20200421104826462.png

      如图,当设置为true时,就会当消费者消费完消息,自动的向发送ACK

    • 缺陷:

      为了演示我先向MQ中发送一条消息:

      image-20200421105032433.png

      接下来修改我们消费者的代码:


      image-20200421105412559.png
    @Override
   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        int a = 10 / 0;
        System.out.println("消费的消息:" + new String(body));
   }

运行结果:


image-20200421105523908.png
image-20200421105543290.png

我们发现在消费消息之前,抛出了异常,也就是说我们消息还没有被消费,此时MQ就把队列中的消息给删除了。说明消息丢失了。

  • 手动ACK:消息接收后,不会自动发送ACK,需要手动发送

    • 准备工作

      为了演示,我们向MQ中发送一条消息


      image-20200421110251192.png
    • 修改消费者代码

      package com.mq.rabbit.helloworld;
      
      import com.mq.rabbit.util.ConnectionUtil;
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      
      public class Consumer {
          private static final String QUEUE_NAME = "hello_word";
      
          public static void main(String[] args) throws Exception {
              // 1.获取连接
              Connection connection = ConnectionUtil.getConnection();
              // 2.创建通道,消费者从队列中获取消息也是借助通道
              Channel channel = connection.createChannel();
              /*
               *   3.声明队列
               *      如果队列不存在就会创建队列
               *      由于我们在生产者者那边已经创建好了队列
               *      那么消费者这边就不会创建队列
               */
              channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      
              /*
               *  4. 监听队列,如果队列中有消息,就直接拿过来
               *      第一个参数:队列名字
               *      第二个参数:是否进行消息自动确认,false代表不再向MQ发送ACK
               *      第三个参数:回调对象,从队列中主动获取消息
               */
              channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
                  /*
                   * consumerTag:消费者标签与消费者相关
                   * envelope:消息的打包数据
                   * properties:消息的头部数据
                   * body:消息主体
                   */
      
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费的消息:" + new String(body));
                      /*
                       * 1. 第一个参数是 传输的标签
                       * 2. 是否要确认所有的消息 true:确认所有信息,包括提供的传输标签
                       *                         false: 仅确认提供的传输标签
                       */
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              });
          }
      }
      

      这样就实现了手动发送ACK

  • 对比

    上述可知,两种发送ACK的方式。那么我们到底用哪种方式:

    • 如果消息不是特别重要,即使丢失了对系统没有什么影响,那么采用ACK比较方便
    • 如果消息非常重要,不允许丢失,那么最好选择手动发送ACK。

2.work模型

work模型称为:工作队列模式

2.1介绍

2.1.1 模型图

image-20200421141641967.png

2.1.2 官方介绍

image-20200421141813471.png

大概意思如下:

在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工人之间分发耗时的任务。

工作队列(也称为任务队列)背后的主要思想是避免立即执行资源密集型任务,而必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作人员时,任务将在他们之间共享。

这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。

接下来我们用java代码去模拟这个过程:

P 生产者: 发布任务(生产消息)

image-20200421142725181.png

C1 消费者1: 获取任务并完成任务

C2 消费者2: 获取任务并完成任务

2.2 编码实现

2.2.1 生产者

  • 代码

    package com.mq.rabbit.work;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        private static final String QUEUE_NAME = "hello_work";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            /*
            *  生产者发布20个任务
            */
            for (int i = 1; i <= 20; i++) {
                String msg = "hello work: " + i;
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            }
            channel.close();
            connection.close();
        }
    }
    
    

2.2.2 消费者1

  • 代码

    package com.mq.rabbit.work;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer1 {
    
        private static final String QUEUE_NAME = "hello_work";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        System.out.println("消费者1:" + new String(body));
                        // 耗时操作
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        }
    }
    
    

2.2.3 消费者2

  • 代码

    package com.mq.rabbit.work;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer2 {
    
        private static final String QUEUE_NAME = "hello_work";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消费者2:" + new String(body));
                        channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        }
    }
    
    

2.2.4 结果分析

  • 先执行消费者1和消费者2,然后再执行生产者


    image-20200421152445061.png
image-20200421152501650.png

生产者总共发布了20条消息,其中消费者1和消费者2分别消费了10条。这就是工作队列机制,将消息数平分给不同的消费者去消费。

2.2.5 存在的问题

通过上述例子发现以下几个问题:

  • 消费者1去处理消息比较耗时,消费者2处理的消息比较快。但是他们处理的消息量是一样。

  • 当消费者2处理完成以后,一直处于空闲状态,而消息1却一直在忙碌

这明显是不合理的。按照正确的做法应该是消费者2处理消息快,多分配一些消息去处理。消费者1处理消息慢就少分配一些消息,能者多劳。那么该怎么去实现呢?

RabbitMQ中提供了一个basicQos方法以及 prefetchCount=1设置。其功能就是告诉MQ一次不要向消费者发送多条消息,等消息者把消息处理并确认完成。才会再次发送下一条消息。相反,如果消费者还是处于忙率中,那么MQ就会把消息分派给不是很忙碌的消费者。

2.2.6 改造消费者

  • 代码


    image-20200421154754149.png
package com.mq.rabbit.work;

import com.mq.rabbit.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {

    private static final String QUEUE_NAME = "hello_work";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos(1);
        
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    System.out.println("消费者1:" + new String(body));
                    // 耗时操作
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

  • 启动测试


    image-20200421154924090.png
image-20200421154958331.png

这样我们就实现了能者多劳

3.发布/订阅模型

3.1 思考

通过上述模型我们可以指导,同一条消息只能发送给一个消费者,但如果说我想要把一个消息发给多个消费者,这又该怎么做呢?

3.2 介绍

3.2.1 官方介绍

image-20200421161335546.png

大体意思如下:

在之前的模式中,我们创建了一个工作队列。工作队列背后的假设是:每个任务都被精确地传递给一个工人。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递一条消息。这种模式称为“发布/订阅”。

3.2.2 模型图

image-20200421161949840.png

生产者把消息发送给交换机X(图中蓝紫色部分),交换机X将消息转发到不同的队列中。

  • 1个生产者,多个消费者
  • 每一个消费者都有自己的队列
  • 生产者是将消息发送到交换机,交换机把消息转发到了队列
  • 每一个队列都需要绑定交换机
  • 一条消息被多个消费者消费

3.2.3 交换机

  • 介绍


    image-20200421162256433.png

    大体意思如下:

    交换机

    在之前的模型中,我们直接向队列发送和接收消息。现在是时候在Rabbit中引入完整的消息传递模型了。

    • 生产者是发送消息的用户应用程序。

    • 队列是存储消息的缓冲区。

    • 消费者是接收消息的用户应用程序。

    RabbitMQ消息传递模型的核心思想是,生产者从不将任何消息直接发送到队列。实际上,生产者常常根本不知道消息是否会被传递到任何队列。

    相反,生产者只能向交换机发送消息。一方面交换机接收来自生产者的消息,另一方面它将它们推送到队列中。

3.2.4 交换机类型

  • Fanout 广播,将消息转发到所有绑定交换机的队列上
  • Direct 定向,将消息转发到符合指定routing key的队列上
  • Topic 通配符, 把 消息转发符合routing pattern(路由模式)的队列

注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.2.5 发布/订阅模型-Fanout

  • 介绍

    Fanout类型也称为广播类型,这种类型有以下特点:

    • 每个队列都要绑定到交换机,且生产者发送的消息只能发送到交换机,由交换机决定将消息发送到哪个队列。生产者无法决定,甚至生产者都不知道消息被转发到了哪个队列上


      image-20200421163949519.png
    • 每一个消费者都需要有自己的队列,可以有多个消费者


      image-20200421164114761.png
    • 交换机会把所有消息转发到每一个绑定到交换机上的队列


      image-20200421164147200.png
  • 编码实现

    • 生产者

      1. 生产者跟队列没有关系,只跟交换机有关系
      2. 发送消息发送到交换机,不是队列上
      package com.mq.rabbit.fanout;
      
      import com.mq.rabbit.util.ConnectionUtil;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import org.springframework.amqp.core.ExchangeTypes;
      
      public class Producer {
          private static final String EXCHANGE_NAME = "amq.fanout";
          public static void main(String[] args) throws Exception {
              // 1.获取连接
              Connection connection = ConnectionUtil.getConnection();
              // 2. 创建通道
              Channel channel = connection.createChannel();
              /*
               *   3.声明交换机
               *     第一个参数:交换机名字
               *      第二个参数: 交换机类型
               */
              channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
              String msg = "hello exchange";
              channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
              channel.close();
              connection.close();
          }
      }
      
      
    • 消费者1

      1.消费者需要绑定到队列上,每一个消费者有自己的队列

      package com.mq.rabbit.fanout;
      
      import com.mq.rabbit.util.ConnectionUtil;
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      
      public class Consumer1 {
          private static final String QUEUE_NAME = "consumer_queue_1";
          private static final String EXCHANGE_NAME = "amq.fanout";
      
          public static void main(String[] args) throws Exception {
              // 1.获取连接
              Connection connection = ConnectionUtil.getConnection();
              // 2.创建通道
              Channel channel = connection.createChannel();
              // 3.声明队列
              channel.queueDeclare(QUEUE_NAME,false,false,false,null);
              /*
               * 4.将队列绑定到交换机
               *     第一个参数 队列名字
               *     第二个参数 交换机名字
               *     第三个参数 路由key 后面再说这个
               */
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
      
              // 5. 监听队列获取消息
              channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费者1:" + new String(body));
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              });
          }
      }
      
      
    • 消费者2

      package com.mq.rabbit.fanout;
      
      import com.mq.rabbit.util.ConnectionUtil;
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      
      public class Consumer2 {
          private static final String QUEUE_NAME = "consumer_queue_2";
          private static final String EXCHANGE_NAME = "amq.fanout";
      
          public static void main(String[] args) throws Exception {
              // 1.获取连接
              Connection connection = ConnectionUtil.getConnection();
              // 2.创建通道
              Channel channel = connection.createChannel();
              // 3.声明队列
              channel.queueDeclare(QUEUE_NAME,false,false,false,null);
              /*
               * 4.将队列绑定到交换机
               *     第一个参数 队列名字
               *     第二个参数 交换机名字
               *     第三个参数 路由key 后面再说这个
               */
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
      
              // 5. 监听队列获取消息
              channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费者2:" + new String(body));
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              });
          }
      }
      
      
    • 启动测试:

      • 如果先启动生产者,那么就会创建一个交换机,并且给交换机发送消息,但是我们此时还没有启动消费者,所以交换机里面的消息也会丢失

      • 如果先启动消费者,那么队列绑定的交换机并不存在,所以也没法绑定,从而抛出异常


        image-20200422092203108.png
        Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'fanout2_exchange' in vhost '/', class-id=50, method-id=20)
          at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522)
          at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
          at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
          at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
          at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
          at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
          at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
          at java.lang.Thread.run(Thread.java:745)
        
        
      • 解决办法

        先启动一次生产者,创建交换机,创建交换机,由于交换机不能存储消息。所以消息就会丢失


        image-20200422092430797.png
  再启动消费者
image-20200422092556522.png

我们可以看到消费者并没有消费消息,因为交换机里面已经没有消息了。


image-20200422092643891.png

交换机也也绑定了队列。此时我们再启动一次生产者,由于交换机已经存在,所以就会往交换机里发送消息


image-20200422092758096.png

image-20200422092829295.png

当然如果不想这么麻烦,也可以使用MQ提供的交换机。如下:
image-20200422092917259.png

4.Routing模型

4.1 介绍

Routing模型(路由模型)其实也是属于发布/订阅模型。只不过是交换机类型不一样,这里我们将学习Direct交换机模型。这个类型于Fanout类型不同的是,Fanout类型是给每一个绑定到交换机上的队列发消息,而Direct则是可以向指定的队列发送消息,通过RoutingKey(路由key)

官方介绍如下:


image-20200422094942316.png

大体意思如下:

在Fanout类型中,生产者发布消息,所有消费者都可以获取所有消息。

在路由模型中,我们将添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),生产者在向Exchange发送消息时,也必须指定消息的routing key。

简而言之就是生产者需要告诉交换机要将消息发送到指定的队列中,怎么告诉就是通过RoutingKey(路由key)

4.2 模型图

image-20200422095256784.png

从图中我们可以看出:

  • P (生产者) 向 X(交换机)发送消息时会指定 路由key,
  • 由于交换机类型为direct,该交换机就根据不同的路由key将orange消息转发到了Q1队列,将消息black,green消息转发到了Q2队列,然后被彼此绑定的消费者所消费。

接下来我们将使用Java代码模拟图中过程。

4.3 代码

  • 生产者

    package com.mq.rabbit.routting;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import org.springframework.amqp.core.ExchangeTypes;
    
    public class Producer {
        public static final String EXCHANGE_NAME = "hello_exchange_direct";
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.创建通道
            Channel channel = connection.createChannel();
            //3. 创建交换机
            channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
            // 4. 发送消息
            String orange = "hello orange";
            /*
             *   5.发送消息
             *      第一个参数:交换机名字
             *      第二个参数:路由key
             *      第三个参数:消息其他参数
             *      第四个参数: 消息
             */
            channel.basicPublish(EXCHANGE_NAME,"q1",null,orange.getBytes());
    
            String black = "hello black";
            channel.basicPublish(EXCHANGE_NAME,"q2",null,black.getBytes());
    
            String green = "hello green";
            channel.basicPublish(EXCHANGE_NAME,"q2",null,green.getBytes());
    
            channel.close();
            connection.close();
        }
    }
    
    

    上述代码可知:

    • 生产者发送了三条消息hello orange,hello black,hello green
    • orange消息的路由key为q1,到时候发送到q1队列上,并消费者1消费
    • black,green消息的路由可以为q2,到时候发送到q2队列上,并被消费者2消费
  • 消费者1

    package com.mq.rabbit.routting;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer1 {
        public static final  String QUEUE_NAME = "consumer1";
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            /*
            *   绑定交换机
            *       第一个参数为队列名字
            *       第二个参数为交换机名字
            *       第三个参数为路由key
             */
            channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q1");
    
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel)  {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            });
        }
    }
    
    
  • 消费者2

    package com.mq.rabbit.routting;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer2 {
        public static final  String QUEUE_NAME = "consumer2";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q1");
            channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q2");
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel)  {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            });
        }
    }
    
    

4.4 测试

  • 生产者测试


    image-20200422104639726.png
  • 消费者测试


    image-20200422104948958.png

    image-20200422105006683.png

    效果满足我们想要的,这就是MQ中的路由模型。

5.Topics模型

MQ中的Top模型其实也是属于发布/订阅中模型的一种,只不过交换机模型换成了 Topic

5.1 介绍

image-20200422111140615.png

大体意思如下:

  • 路由key由一个或者多个参数组成,如果是多个单词必须以 . 号隔开 例如:category.update

  • Topic类型的交换机与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

    • * 只能匹配一个单词
    • # 匹配一个或者多个单词
    • 例如:
      • product.* product.insert 能够匹配到,product.insert.dd就匹配不到
      • product.# product.insert ,product.insert.dd都能匹配到

5.2 模型图

image-20200422112203396.png

我们将发送所有描述动物的消息。消息将用路由key发送,路由key由三个字(两个点)组成。路由key中的第一个词将描述一种快速性、第二种颜色和第三种a物种:“<celerity><colour><species>”。

我们创建了三个绑定:Q1用绑定键*.orange.*绑定,Q2用*.rabbitlazy.#绑定。

这些绑定可以概括为:

  • Q1匹配的橙色动物

  • Q2匹配兔子和懒惰动物

例如:

quick.orange.rabbit 就会 被 Q2队列匹配到

lazy.orange.elephant就会被 Q1 Q2队列匹配到

lazy.pink.rabbit 就会被Q2队列匹配到

quick.brown.fox 不会被任何队列匹配到

5.3 代码实现

  • 生产者

    使用topic 类型交换机,路由key为:lazy.pink.rabbit lazy.orange.elephantquick.orange.rabbit``

    package com.mq.rabbit.topic;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import org.springframework.amqp.core.ExchangeTypes;
    
    public class Producer {
        public static final String EXCHNAGE_NAME = "hello_exchange_topic";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHNAGE_NAME, ExchangeTypes.TOPIC);
            String msg = "hello lazy.pink.rabbit";
            channel.basicPublish(EXCHNAGE_NAME,"lazy.pink.rabbit",null,msg.getBytes());
    
            msg = "hello lazy.orange.elephant";
            channel.basicPublish(EXCHNAGE_NAME,"lazy.orange.elephant",null,msg.getBytes());
    
            msg = "hello quick.orange.rabbit";
            channel.basicPublish(EXCHNAGE_NAME,"quick.orange.rabbit",null,msg.getBytes());
    
            channel.close();
            connection.close();
        }
    
    }
    
    
  • 消费者1

    package com.mq.rabbit.topic;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer1 {
        public static final String  QUEUE_NAME = "Q1";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"*.orange.*");
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:" + new String(body));
                }
            });
        }
    }
    
    
  • 消费者2

    package com.mq.rabbit.topic;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer2 {
        public static final String  QUEUE_NAME = "Q2";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"*.*.rabbit");
            channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"lazy.#");
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2:" + new String(body));
                }
            });
        }
    }
    
    
  • 测试

    • 测试生产者


      image-20200422153000256.png
    • 测试消费者


      image-20200422153240392.png

      image-20200422153256104.png

      以上就是我们的topic类型

6.消息堆积&丢失问题

6.1 堆积

如何避免消息对接问题:

  • 在消费者一方启用多线程去消费
  • 使用work模型去分担消息,注意,发布/订阅模型可以和work模型结合使用

6.2 丢失

如何避免消息丢失

  • 消费端使用手动ACK机制(如何消费者在消费消息之前,MQ就挂掉,那么这个操作无用)
  • 将消息持久化

消息要想持久化,那么前提条件就是 交换机,队列都需要持久化

6.2.1 交换机持久化

image-20200422154334785.png

6.2.2 队列持久化

image-20200422154438511.png

7. RPC模型

rpc 模型其实是属于远程调用,不属于消息模型,所以这里不说明,如果对rpc感兴趣,可以去了解一下dubbo

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

推荐阅读更多精彩内容