四种交换机类型
fanout 它会将发送到该交换机的消息路由到所有与该交互机存在binding的队列中,无视RoutingKey
direct 把消费发送到RoutingKey与BindingKey完全匹配的队列中。
-
topic 类似direct交换机,但是BindingKey可以是模糊匹配的规则
* 代表任意个单词(可为零)
# 代表只有一个单词
headers 性能较差,一般不使用
重名交换机或队列
生产者和消费者都可以声明一个队列。如果声明一个已存在的交换机或队列,只要声明的参数完全匹配现存的交换机或队列,则RabbitMQ什么都不做,并成功返回。否则,将会抛出异常。
声明交换机
声明交换机时有两个参数需要注意
durable: 是否持久化。 持久化可以讲交换机保存在磁盘中,服务器重启后不会丢失信息。
autoDelete: 是否自动删除。自动删除的前提是至少有一个交换机或对队列与这个交换机绑定,之后所有的交换机和队列与这个交换机解绑。
声明队列
durable: 是否持久化。 持久化可以讲交换机保存在磁盘中,服务器重启后不会丢失信息。
exclusive: 是否排他。如果一个队列被声明为排他队列,则该队列仅对首次声明它的连接可见,并且在连接断开的时候自动删除。这里需要注意三点:排他队列是基于连接(connection)可见的,同一个连接的不同信道(channel)可以访问同一连接创建的排他队列。“首次” 是指如果体格连接已经创建了一个排他队列,则其他连接不允许再创建同名的排他队列。即使该排他队列是持久化的(durable=true),一点连接关闭或客户端退出,该队列就会被删除。
autoDelete: 是否自动删除。自动删除的前提是:至少有一个消费者连接到该队列,之后所有的消费者与这个队列的连接都断开时,才会自动删除。
queueDeclarePassive
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
这个方法用来检测队列是否已经存在。如果存在则正常返回,不存在则抛异常:404 channel exception 同时channel也会被关闭。
未确认的消息
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者是否断开连接,这么设计的原因是RabbitMQ需要消费者话很久的时间来处理这条消息。
mandatory参数
当mandatory参数设置为true的时候,若交换机无法根据自己的类型和路由键找到合适的队列,那么RabbitMQ会调用basic.return命令将该消息返回给生产者。当mandatory设置为false时,该消息将会被直接丢弃。
在springboot整合RabbitMQ的starter中,对应的方法为RabbitTempalte.ReturnCallback接口的returnedMessage方法。
注意:当找不到队里时才会调用这个returnedMessage方法。当交换机找不到时,会直接到RabbitTempalte.ConfirmCallback的confirm方法
消息的过期时间
针对队列中的所有消息设置过期时间
@Bean
public Queue queue() {
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl","100");// 设置队列中消息的过期时间,单位毫秒
return new Queue(QUEUE_1, true,false,false,args);
}
针对单个消息设置过期时间
public void sendDirectAck(){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessageProperties messageProperties = new MessageProperties();
// 设置消息的过期时间
messageProperties.setExpiration("1000"); //设置每条消息具体的过期时间 单位毫秒
Message message = new Message(user1.toString().getBytes(),messageProperties);// 不要在意字节数组的细节
rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANGE,RabbitmqConfig.ROUTING_KEY1,message,correlationData);
}
死信队列
DLX, Dead-Letter-Exchange。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:
- 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0以前支持的immediate参数的功能。
核心代码实现:通过在queueDeclare方法中加入“x-dead-letter-exchange”实现。
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
你也可以为这个DLX指定routing key,如果没有特殊指定,则使用原队列的routing key
args.put("x-dead-letter-routing-key", "some-routing-key");
延迟队列
延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延迟队列将订单信息发送到延迟队列。
场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到只能设备。
消息的持久化
// 设置消息持久化 (默认是持久化的)
messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
如果队列没有设置持久化,那么及时消息设置了持久化,重启后消息依旧会消失
持久化会严重影响rabbitmq的性能
消息的推拉模式
在rabbitmq中支持两种消息处理的模式,一种是订阅模式(也叫push模式),由broker主动将消息推送给订阅队列的消费者;另一种是检索模式(也叫pull模式),需要消费者调用channel.basicGet方法,主动从队列中拉取消息。
订阅模式(push)
订阅模式接收消息是最有效的一种消息处理方式,当消息到达broker时,broker会自动将消息投递给匹配的消费者,而不需要消费端手动去拉取。在同一个通道channel中,每个消费者Consumer都有着不同的consumer-tag标识,这个标识可以是客户端指定,也可以由broker服务端自动生成(如果客户端手动指定了,则以客户端的为准,如果没有指定则由服务端自动生成)。
检索模式(pull)
通过使用Channel.basicGet显示拉取消息,返回的数据类型是GetResponse实例。
消息分发
多个消费者订阅同一个队列,这时候队列中的消息会采用轮询(Round-Robin)的方式发送给消费者,即每个消息只会有一些消费者来处理,避免消息的重复消费。
但这种模式有一个潜在的问题,就说如果消费者A处理消息的速度很快,而B处理得很慢。采用轮询分发的时候有可能出现A消费者处理空闲,而B消费者却出现消息堆积的问题。
此时可以在消费者端调用channel.basicQos(...)方法指定每个消费者未确认的消息的数量(只在推(push)模式下有效)
此方法有三个重载
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount) throws IOException;
关于 global和prefetch的含义,参考官方文档Consumer Prefetch
Unfortunately the channel is not the ideal scope for this - since a single channel may consume from multiple queues, the channel and the queue(s) need to coordinate with each other for every message sent to ensure they don't go over the limit. This is slow on a single machine, and very slow when consuming across a cluster.
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
当单个channel有多个消费者时,协调两个消费者不超过limit会严重影响性能。
Therefore RabbitMQ redefines the meaning of the global flag in the basic.qos method:
global | Meaning of prefetch_count in AMQP 0-9-1 | Meaning of prefetch_count in RabbitMQ |
---|---|---|
false | shared across all consumers on the channel | applied separately to each new consumer on the channel |
true | shared across all consumers on the connection | shared across all consumers on the channel |
可以设置global=false,是每个消费者独享消息数量的限制(默认即为false)