RabbitMQ学习笔记

四种交换机类型

  • fanout   它会将发送到该交换机的消息路由到所有与该交互机存在binding的队列中,无视RoutingKey

  • direct  把消费发送到RoutingKey与BindingKey完全匹配的队列中。

  • topic   类似direct交换机,但是BindingKey可以是模糊匹配的规则

    * 代表任意个单词(可为零)

    # 代表只有一个单词

  • headers   性能较差,一般不使用

重名交换机或队列

生产者和消费者都可以声明一个队列。如果声明一个已存在的交换机或队列,只要声明的参数完全匹配现存的交换机或队列,则RabbitMQ什么都不做,并成功返回。否则,将会抛出异常。

声明交换机

声明交换机时有两个参数需要注意

  • durable: 是否持久化。 持久化可以讲交换机保存在磁盘中,服务器重启后不会丢失信息。

  • autoDelete: 是否自动删除。自动删除的前提是至少有一个交换机或对队列与这个交换机绑定,之后所有的交换机和队列与这个交换机解绑。

声明队列

  • durable: 是否持久化。 持久化可以讲交换机保存在磁盘中,服务器重启后不会丢失信息。

  • exclusive: 是否排他。如果一个队列被声明为排他队列,则该队列仅对首次声明它的连接可见,并且在连接断开的时候自动删除。这里需要注意三点:排他队列是基于连接(connection)可见的,同一个连接的不同信道(channel)可以访问同一连接创建的排他队列。“首次” 是指如果体格连接已经创建了一个排他队列,则其他连接不允许再创建同名的排他队列。即使该排他队列是持久化的(durable=true),一点连接关闭或客户端退出,该队列就会被删除。

  • autoDelete: 是否自动删除。自动删除的前提是:至少有一个消费者连接到该队列,之后所有的消费者与这个队列的连接都断开时,才会自动删除。

queueDeclarePassive

Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

这个方法用来检测队列是否已经存在。如果存在则正常返回,不存在则抛异常:404 channel exception 同时channel也会被关闭。

未确认的消息

RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者是否断开连接,这么设计的原因是RabbitMQ需要消费者话很久的时间来处理这条消息。

mandatory参数

当mandatory参数设置为true的时候,若交换机无法根据自己的类型和路由键找到合适的队列,那么RabbitMQ会调用basic.return命令将该消息返回给生产者。当mandatory设置为false时,该消息将会被直接丢弃。

在springboot整合RabbitMQ的starter中,对应的方法为RabbitTempalte.ReturnCallback接口的returnedMessage方法。

注意:当找不到队里时才会调用这个returnedMessage方法。当交换机找不到时,会直接到RabbitTempalte.ConfirmCallback的confirm方法

消息的过期时间

针对队列中的所有消息设置过期时间

@Bean
public Queue queue() {
    Map<String,Object> args = new HashMap<>();
    args.put("x-message-ttl","100");// 设置队列中消息的过期时间,单位毫秒
    return new Queue(QUEUE_1, true,false,false,args);
}

针对单个消息设置过期时间

public void sendDirectAck(){
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    MessageProperties messageProperties = new MessageProperties();
    // 设置消息的过期时间
    messageProperties.setExpiration("1000"); //设置每条消息具体的过期时间 单位毫秒
    Message message = new Message(user1.toString().getBytes(),messageProperties);// 不要在意字节数组的细节
    rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANGE,RabbitmqConfig.ROUTING_KEY1,message,correlationData);
}

死信队列

DLX, Dead-Letter-Exchange。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:

  • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0以前支持的immediate参数的功能。

核心代码实现:通过在queueDeclare方法中加入“x-dead-letter-exchange”实现。

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

你也可以为这个DLX指定routing key,如果没有特殊指定,则使用原队列的routing key

args.put("x-dead-letter-routing-key", "some-routing-key");

延迟队列

延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延迟队列将订单信息发送到延迟队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到只能设备。


延迟队列

消息的持久化

// 设置消息持久化 (默认是持久化的)    
messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);

如果队列没有设置持久化,那么及时消息设置了持久化,重启后消息依旧会消失

持久化会严重影响rabbitmq的性能

消息的推拉模式

在rabbitmq中支持两种消息处理的模式,一种是订阅模式(也叫push模式),由broker主动将消息推送给订阅队列的消费者;另一种是检索模式(也叫pull模式),需要消费者调用channel.basicGet方法,主动从队列中拉取消息。

订阅模式(push)

订阅模式接收消息是最有效的一种消息处理方式,当消息到达broker时,broker会自动将消息投递给匹配的消费者,而不需要消费端手动去拉取。在同一个通道channel中,每个消费者Consumer都有着不同的consumer-tag标识,这个标识可以是客户端指定,也可以由broker服务端自动生成(如果客户端手动指定了,则以客户端的为准,如果没有指定则由服务端自动生成)。

检索模式(pull)

通过使用Channel.basicGet显示拉取消息,返回的数据类型是GetResponse实例。

消息分发

多个消费者订阅同一个队列,这时候队列中的消息会采用轮询(Round-Robin)的方式发送给消费者,即每个消息只会有一些消费者来处理,避免消息的重复消费。

但这种模式有一个潜在的问题,就说如果消费者A处理消息的速度很快,而B处理得很慢。采用轮询分发的时候有可能出现A消费者处理空闲,而B消费者却出现消息堆积的问题。

此时可以在消费者端调用channel.basicQos(...)方法指定每个消费者未确认的消息的数量(只在推(push)模式下有效

此方法有三个重载

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

void basicQos(int prefetchCount, boolean global) throws IOException;

void basicQos(int prefetchCount) throws IOException;

关于 global和prefetch的含义,参考官方文档Consumer Prefetch

Unfortunately the channel is not the ideal scope for this - since a single channel may consume from multiple queues, the channel and the queue(s) need to coordinate with each other for every message sent to ensure they don't go over the limit. This is slow on a single machine, and very slow when consuming across a cluster.

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

当单个channel有多个消费者时,协调两个消费者不超过limit会严重影响性能。

Therefore RabbitMQ redefines the meaning of the global flag in the basic.qos method:

global Meaning of prefetch_count in AMQP 0-9-1 Meaning of prefetch_count in RabbitMQ
false shared across all consumers on the channel applied separately to each new consumer on the channel
true shared across all consumers on the connection shared across all consumers on the channel

可以设置global=false,是每个消费者独享消息数量的限制(默认即为false)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容

  • RabbitMQ 简介 MQ 消息队列,上承生产者,下接消费者。从生产者侧获取消息,然后将消息转发给消费者。由此可...
    2205阅读 3,482评论 1 11
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,869评论 2 11
  • 1.vhost(虚拟主机) 虚拟主机相当于一个隔离的空间,多个虚拟主机可以对不同的用户,不同的作用分割开来 2.P...
    MR_Hanjc阅读 767评论 0 0