1.消息模型
根据官方文档得知,RabbitMQ有七种消息模型:
1.1 Hello World消息模型
1.1.1 介绍
翻译成中文如下:
RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把你想寄出的邮件放进一个邮箱里时,你可以确信邮件的收件人最终会收到邮件。在这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。
RabbitMQ与邮局的主要区别在于,它不处理纸张,而是接受、存储和转发二进制的数据信息块。
1.1.2 模型图
-
P (Produce) 生产者,主要是生产消息,以及发送消息。说白了就是一个发送消息的应用程序
image-20200420163700439.png
- 队列 (queue) 队列是RabbitMQ中的邮箱的名称。尽管消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中。队列只受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,而许多消费者可以尝试从一个队列接收数据
-
C (consumer) 消费和接受有着相似的含义。消费者者是一个主要等待接收消息的程序
image-20200420163923433.png
注意:生产者、消费者和代理不必驻留在同一主机上;事实上,在大多数应用程序中,它们不必驻留在同一主机上。应用程序也可以同时是生产者和消费者。
1.1.3 代码实现
接下来我们采用java语言编写生产者程序,以及消费者程序来感受一下其魅力。
生者者:Send
消费者:Consumer
-
生产者代码实现:
-
模型图:
image-20200420164345046.png
-
由图中可知,生产者不仅要生产消息,还要将消息发送到指定队列:
-
创建 springboot项目(wangzh-rabbitmq)
image-20200420164715769.png -
导入amqp依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
编写工具类,用来获取连接
package com.mq.rabbit.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws Exception { // 1. 创建连接工厂,用来获取连接 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2. 设置基本信息 // 设置rabbitmq所在地址 connectionFactory.setHost("192.168.169.130"); // 设置用户名,我们先前创建了一个wangzh的用户 connectionFactory.setUsername("wangzh"); // 设置密码 connectionFactory.setPassword("wangzh"); // 设置端口 这个端口是amqp协议的端口 connectionFactory.setPort(5672); return connectionFactory.newConnection(); } }
-
编写发送端程序
package com.mq.rabbit.helloworld; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { /** * 队列名字 */ private static final String QUEUE_NAME="hello_word"; public static void main(String[] args) { try { // 1. 获取连接 Connection connection = ConnectionUtil.getConnection(); /* * 2. 创建通道 * 生产者发送消息到队列中需要借助通道 */ Channel channel = connection.createChannel(); /** * 3. (创建)声明队列 * 如果名字所对应的队列存在,那么就不存创建队列,而是去时使用现成对的队列 * 如果名字对应的对应不存在,那么就去创建队列 * 第一个参数: 队列的名字 * 第二个参数: 是否声明一个持久化队列,true表示会将消息持久化 * 第三个参数: 是否声明一个独占队列(创建者可以使用的私有队列,断开后自动删除), true表示声明成独占队列 * 第四个参数: 是否声明一个删除队列(消费者客户端连接断开时是否自动删除队列),true表示声明成删除队列 * 第五个参数: 队列其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 消息 String msg = "I am OK"; /** * 将消息存入队列中 * 第一个参数:使交换机的名字 我们后面再将交换机 * 第二个参数:队列映射的路由key,我们后面再讲 * 第三个参数: 队列消息其他属性 * 第四个参数: 发送消息的主体 */ channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("发送成功"); // 关闭资源 channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
-
执行程序
image-20200421093625708.png -
查看管理页面
image-20200421093724551.png
通过上图我们可以看到当生产者发送消息到队列中时,管理界面就能看到这个队列,以及队列里面的消息数。
注意:我们只是在控制台看到消息,并不会去消费这个消息。
-
编写消费者
package com.mq.rabbit.helloworld; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { private static final String QUEUE_NAME="hello_word"; public static void main(String[] args) throws Exception { // 1.获取连接 Connection connection = ConnectionUtil.getConnection(); // 2.创建通道,消费者从队列中获取消息也是借助通道 Channel channel = connection.createChannel(); /* * 3.声明队列 * 如果队列不存在就会创建队列 * 由于我们在生产者者那边已经创建好了队列 * 那么消费者这边就不会创建队列 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 4. 监听队列,如果队列中有消息,就直接拿过来 * 第一个参数:队列名字 * 第二个参数:是否进行消息自动确认,后面我们讲ack参数时再说 * 第三个参数:回调对象,从队列中主动获取消息 */ channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ /* * consumerTag:消费者标签与消费者相关 * envelope:消息的打包数据 * properties:消息的头部数据 * body:消息主体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(consumerTag); System.out.println(envelope); System.out.println(properties); System.out.println("消费的消息:" + new String(body)); } }); } }
-
执行结果
image-20200421101003782.png
image-20200421101051070.png
由上图可知:当消息被消费后,队列里面就没有这一条消息了。同时消费者的应用程序并没有停止,而是一致在运行着,一致在监听队列。
自此我们一个简单的Helloword消息模型就写完了
1.1.4 ACK 机制
我们来思考一下有没有上述的例子什么问题???
1.消费者当消费消息后,MQ就会把队列中的消息删除,那么MQ怎么就知道消息被消费了呢?
2.当消费者领取消息后,还没有消费就挂掉了,或者是发生异常,那么MQ就无法得知消息有没有被消费掉。
为了解决上述问题,RabbitMQ提供了一个消息确认机制(ACK机制),当消费者把队列中的消息消费以后,会向Rabbi发送一个ACK
,告诉MQ消息已经被消费了,你可以把消息删除了。
不过这种发送ACK
有两种方式:
-
自动发送ACK:消息一旦被接收,自动向MQ发送ACK
-
代码实现:
image-20200421104826462.png
如图,当设置为true
时,就会当消费者消费完消息,自动的向发送ACK -
缺陷:
为了演示我先向MQ中发送一条消息:
image-20200421105032433.png接下来修改我们消费者的代码:
image-20200421105412559.png
-
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
int a = 10 / 0;
System.out.println("消费的消息:" + new String(body));
}
运行结果:
我们发现在消费消息之前,抛出了异常,也就是说我们消息还没有被消费,此时MQ就把队列中的消息给删除了。说明消息丢失了。
-
手动ACK:消息接收后,不会自动发送ACK,需要手动发送
-
准备工作
为了演示,我们向MQ中发送一条消息
image-20200421110251192.png -
修改消费者代码
package com.mq.rabbit.helloworld; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { private static final String QUEUE_NAME = "hello_word"; public static void main(String[] args) throws Exception { // 1.获取连接 Connection connection = ConnectionUtil.getConnection(); // 2.创建通道,消费者从队列中获取消息也是借助通道 Channel channel = connection.createChannel(); /* * 3.声明队列 * 如果队列不存在就会创建队列 * 由于我们在生产者者那边已经创建好了队列 * 那么消费者这边就不会创建队列 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* * 4. 监听队列,如果队列中有消息,就直接拿过来 * 第一个参数:队列名字 * 第二个参数:是否进行消息自动确认,false代表不再向MQ发送ACK * 第三个参数:回调对象,从队列中主动获取消息 */ channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) { /* * consumerTag:消费者标签与消费者相关 * envelope:消息的打包数据 * properties:消息的头部数据 * body:消息主体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费的消息:" + new String(body)); /* * 1. 第一个参数是 传输的标签 * 2. 是否要确认所有的消息 true:确认所有信息,包括提供的传输标签 * false: 仅确认提供的传输标签 */ channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
这样就实现了手动发送
ACK
-
-
对比
上述可知,两种发送ACK的方式。那么我们到底用哪种方式:
- 如果消息不是特别重要,即使丢失了对系统没有什么影响,那么采用ACK比较方便
- 如果消息非常重要,不允许丢失,那么最好选择手动发送ACK。
2.work模型
work模型称为:工作队列模式
2.1介绍
2.1.1 模型图
2.1.2 官方介绍
大概意思如下:
在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工人之间分发耗时的任务。
工作队列(也称为任务队列)背后的主要思想是避免立即执行资源密集型任务,而必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作人员时,任务将在他们之间共享。
这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
接下来我们用java代码去模拟这个过程:
P 生产者: 发布任务(生产消息)
C1 消费者1: 获取任务并完成任务
C2 消费者2: 获取任务并完成任务
2.2 编码实现
2.2.1 生产者
-
代码
package com.mq.rabbit.work; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "hello_work"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 生产者发布20个任务 */ for (int i = 1; i <= 20; i++) { String msg = "hello work: " + i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } channel.close(); connection.close(); } }
2.2.2 消费者1
-
代码
package com.mq.rabbit.work; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { private static final String QUEUE_NAME = "hello_work"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println("消费者1:" + new String(body)); // 耗时操作 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
2.2.3 消费者2
-
代码
package com.mq.rabbit.work; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { private static final String QUEUE_NAME = "hello_work"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
2.2.4 结果分析
-
先执行消费者1和消费者2,然后再执行生产者
image-20200421152445061.png
生产者总共发布了20条消息,其中消费者1和消费者2分别消费了10条。这就是工作队列机制,将消息数平分给不同的消费者去消费。
2.2.5 存在的问题
通过上述例子发现以下几个问题:
消费者1去处理消息比较耗时,消费者2处理的消息比较快。但是他们处理的消息量是一样。
当消费者2处理完成以后,一直处于空闲状态,而消息1却一直在忙碌
这明显是不合理的。按照正确的做法应该是消费者2处理消息快,多分配一些消息去处理。消费者1处理消息慢就少分配一些消息,能者多劳。那么该怎么去实现呢?
RabbitMQ中提供了一个basicQos
方法以及 prefetchCount=1
设置。其功能就是告诉MQ一次不要向消费者发送多条消息,等消息者把消息处理并确认完成。才会再次发送下一条消息。相反,如果消费者还是处于忙率中,那么MQ就会把消息分派给不是很忙碌的消费者。
2.2.6 改造消费者
-
代码
image-20200421154754149.png
package com.mq.rabbit.work;
import com.mq.rabbit.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
private static final String QUEUE_NAME = "hello_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("消费者1:" + new String(body));
// 耗时操作
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
-
启动测试
image-20200421154924090.png
这样我们就实现了能者多劳
3.发布/订阅模型
3.1 思考
通过上述模型我们可以指导,同一条消息只能发送给一个消费者,但如果说我想要把一个消息发给多个消费者,这又该怎么做呢?
3.2 介绍
3.2.1 官方介绍
大体意思如下:
在之前的模式中,我们创建了一个工作队列。工作队列背后的假设是:每个任务都被精确地传递给一个工人。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递一条消息。这种模式称为“发布/订阅”。
3.2.2 模型图
生产者把消息发送给交换机X(图中蓝紫色部分),交换机X将消息转发到不同的队列中。
- 1个生产者,多个消费者
- 每一个消费者都有自己的队列
- 生产者是将消息发送到交换机,交换机把消息转发到了队列
- 每一个队列都需要绑定交换机
- 一条消息被多个消费者消费
3.2.3 交换机
-
介绍
image-20200421162256433.png
大体意思如下:
交换机
在之前的模型中,我们直接向队列发送和接收消息。现在是时候在Rabbit中引入完整的消息传递模型了。
生产者是发送消息的用户应用程序。
队列是存储消息的缓冲区。
消费者是接收消息的用户应用程序。
RabbitMQ消息传递模型的核心思想是,生产者从不将任何消息直接发送到队列。实际上,生产者常常根本不知道消息是否会被传递到任何队列。
相反,生产者只能向交换机发送消息。一方面交换机接收来自生产者的消息,另一方面它将它们推送到队列中。
3.2.4 交换机类型
- Fanout 广播,将消息转发到所有绑定交换机的队列上
- Direct 定向,将消息转发到符合指定
routing key
的队列上 - Topic 通配符, 把 消息转发符合
routing pattern(路由模式)
的队列
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.2.5 发布/订阅模型-Fanout
-
介绍
Fanout类型也称为广播类型,这种类型有以下特点:
-
每个队列都要绑定到交换机,且生产者发送的消息只能发送到交换机,由交换机决定将消息发送到哪个队列。生产者无法决定,甚至生产者都不知道消息被转发到了哪个队列上
image-20200421163949519.png -
每一个消费者都需要有自己的队列,可以有多个消费者
image-20200421164114761.png -
交换机会把所有消息转发到每一个绑定到交换机上的队列
image-20200421164147200.png
-
-
编码实现
-
生产者
- 生产者跟队列没有关系,只跟交换机有关系
- 发送消息发送到交换机,不是队列上
package com.mq.rabbit.fanout; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.springframework.amqp.core.ExchangeTypes; public class Producer { private static final String EXCHANGE_NAME = "amq.fanout"; public static void main(String[] args) throws Exception { // 1.获取连接 Connection connection = ConnectionUtil.getConnection(); // 2. 创建通道 Channel channel = connection.createChannel(); /* * 3.声明交换机 * 第一个参数:交换机名字 * 第二个参数: 交换机类型 */ channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT); String msg = "hello exchange"; channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); channel.close(); connection.close(); } }
-
消费者1
1.消费者需要绑定到队列上,每一个消费者有自己的队列
package com.mq.rabbit.fanout; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { private static final String QUEUE_NAME = "consumer_queue_1"; private static final String EXCHANGE_NAME = "amq.fanout"; public static void main(String[] args) throws Exception { // 1.获取连接 Connection connection = ConnectionUtil.getConnection(); // 2.创建通道 Channel channel = connection.createChannel(); // 3.声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 4.将队列绑定到交换机 * 第一个参数 队列名字 * 第二个参数 交换机名字 * 第三个参数 路由key 后面再说这个 */ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); // 5. 监听队列获取消息 channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
-
消费者2
package com.mq.rabbit.fanout; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { private static final String QUEUE_NAME = "consumer_queue_2"; private static final String EXCHANGE_NAME = "amq.fanout"; public static void main(String[] args) throws Exception { // 1.获取连接 Connection connection = ConnectionUtil.getConnection(); // 2.创建通道 Channel channel = connection.createChannel(); // 3.声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 4.将队列绑定到交换机 * 第一个参数 队列名字 * 第二个参数 交换机名字 * 第三个参数 路由key 后面再说这个 */ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); // 5. 监听队列获取消息 channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
-
启动测试:
如果先启动生产者,那么就会创建一个交换机,并且给交换机发送消息,但是我们此时还没有启动消费者,所以交换机里面的消息也会丢失
-
如果先启动消费者,那么队列绑定的交换机并不存在,所以也没法绑定,从而抛出异常
image-20200422092203108.pngCaused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'fanout2_exchange' in vhost '/', class-id=50, method-id=20) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672) at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599) at java.lang.Thread.run(Thread.java:745)
-
解决办法
先启动一次生产者,创建交换机,创建交换机,由于交换机不能存储消息。所以消息就会丢失
image-20200422092430797.png
-
再启动消费者
我们可以看到消费者并没有消费消息,因为交换机里面已经没有消息了。
交换机也也绑定了队列。此时我们再启动一次生产者,由于交换机已经存在,所以就会往交换机里发送消息
当然如果不想这么麻烦,也可以使用MQ提供的交换机。如下:
4.Routing模型
4.1 介绍
Routing模型(路由模型)其实也是属于发布/订阅模型。只不过是交换机类型不一样,这里我们将学习Direct
交换机模型。这个类型于Fanout类型不同的是,Fanout类型是给每一个绑定到交换机上的队列发消息,而Direct
则是可以向指定的队列发送消息,通过RoutingKey(路由key)
官方介绍如下:
大体意思如下:
在Fanout类型中,生产者发布消息,所有消费者都可以获取所有消息。
在路由模型中,我们将添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),生产者在向Exchange发送消息时,也必须指定消息的routing key。
简而言之就是生产者需要告诉交换机要将消息发送到指定的队列中,怎么告诉就是通过RoutingKey(路由key)
4.2 模型图
从图中我们可以看出:
- P (生产者) 向 X(交换机)发送消息时会指定 路由key,
- 由于交换机类型为
direct
,该交换机就根据不同的路由key将orange
消息转发到了Q1
队列,将消息black
,green
消息转发到了Q2
队列,然后被彼此绑定的消费者所消费。
接下来我们将使用Java代码模拟图中过程。
4.3 代码
-
生产者
package com.mq.rabbit.routting; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.springframework.amqp.core.ExchangeTypes; public class Producer { public static final String EXCHANGE_NAME = "hello_exchange_direct"; public static void main(String[] args) throws Exception { // 1.创建连接 Connection connection = ConnectionUtil.getConnection(); // 2.创建通道 Channel channel = connection.createChannel(); //3. 创建交换机 channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT); // 4. 发送消息 String orange = "hello orange"; /* * 5.发送消息 * 第一个参数:交换机名字 * 第二个参数:路由key * 第三个参数:消息其他参数 * 第四个参数: 消息 */ channel.basicPublish(EXCHANGE_NAME,"q1",null,orange.getBytes()); String black = "hello black"; channel.basicPublish(EXCHANGE_NAME,"q2",null,black.getBytes()); String green = "hello green"; channel.basicPublish(EXCHANGE_NAME,"q2",null,green.getBytes()); channel.close(); connection.close(); } }
上述代码可知:
- 生产者发送了三条消息
hello orange
,hello black
,hello green
-
orange
消息的路由key为q1
,到时候发送到q1
队列上,并消费者1消费 -
black
,green
消息的路由可以为q2
,到时候发送到q2
队列上,并被消费者2消费
- 生产者发送了三条消息
-
消费者1
package com.mq.rabbit.routting; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static final String QUEUE_NAME = "consumer1"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 绑定交换机 * 第一个参数为队列名字 * 第二个参数为交换机名字 * 第三个参数为路由key */ channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q1"); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
-
消费者2
package com.mq.rabbit.routting; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static final String QUEUE_NAME = "consumer2"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q1"); channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q2"); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
4.4 测试
-
生产者测试
image-20200422104639726.png -
消费者测试
image-20200422104948958.png
image-20200422105006683.png
效果满足我们想要的,这就是MQ中的路由模型。
5.Topics模型
MQ中的Top模型其实也是属于发布/订阅中模型的一种,只不过交换机模型换成了 Topic
5.1 介绍
大体意思如下:
路由key由一个或者多个参数组成,如果是多个单词必须以 . 号隔开 例如:category.update
Topic
类型的交换机与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符
*
只能匹配一个单词#
匹配一个或者多个单词- 例如:
product.*
product.insert
能够匹配到,product.insert.dd
就匹配不到- product.#
product.insert
,product.insert.dd
都能匹配到
5.2 模型图
我们将发送所有描述动物的消息。消息将用路由key发送,路由key由三个字(两个点)组成。路由key中的第一个词将描述一种快速性、第二种颜色和第三种a物种:“<celerity><colour><species>”。
我们创建了三个绑定:Q1用绑定键*.orange.*
绑定,Q2用*.rabbit
和lazy.#
绑定。
这些绑定可以概括为:
Q1匹配的橙色动物
Q2匹配兔子和懒惰动物
例如:
quick.orange.rabbit
就会 被 Q2队列匹配到
lazy.orange.elephant
就会被 Q1 Q2队列匹配到
lazy.pink.rabbit
就会被Q2队列匹配到
quick.brown.fox
不会被任何队列匹配到
5.3 代码实现
-
生产者
使用topic 类型交换机,路由key为:
lazy.pink.rabbit
lazy.orange.elephant
quick.orange.rabbit``package com.mq.rabbit.topic; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.springframework.amqp.core.ExchangeTypes; public class Producer { public static final String EXCHNAGE_NAME = "hello_exchange_topic"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHNAGE_NAME, ExchangeTypes.TOPIC); String msg = "hello lazy.pink.rabbit"; channel.basicPublish(EXCHNAGE_NAME,"lazy.pink.rabbit",null,msg.getBytes()); msg = "hello lazy.orange.elephant"; channel.basicPublish(EXCHNAGE_NAME,"lazy.orange.elephant",null,msg.getBytes()); msg = "hello quick.orange.rabbit"; channel.basicPublish(EXCHNAGE_NAME,"quick.orange.rabbit",null,msg.getBytes()); channel.close(); connection.close(); } }
-
消费者1
package com.mq.rabbit.topic; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static final String QUEUE_NAME = "Q1"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"*.orange.*"); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } }
-
消费者2
package com.mq.rabbit.topic; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static final String QUEUE_NAME = "Q2"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"*.*.rabbit"); channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"lazy.#"); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:" + new String(body)); } }); } }
-
测试
-
测试生产者
image-20200422153000256.png -
测试消费者
image-20200422153240392.png
image-20200422153256104.png
以上就是我们的topic类型
-
6.消息堆积&丢失问题
6.1 堆积
如何避免消息对接问题:
- 在消费者一方启用多线程去消费
- 使用work模型去分担消息,注意,发布/订阅模型可以和work模型结合使用
6.2 丢失
如何避免消息丢失
- 消费端使用手动ACK机制(如何消费者在消费消息之前,MQ就挂掉,那么这个操作无用)
- 将消息持久化
消息要想持久化,那么前提条件就是 交换机,队列都需要持久化
6.2.1 交换机持久化
6.2.2 队列持久化
7. RPC模型
rpc 模型其实是属于远程调用,不属于消息模型,所以这里不说明,如果对rpc感兴趣,可以去了解一下dubbo