RabbitMq:交换机四种类型详解

image.png

进入管理页面后,可以看到Connection(连接)、Channel(信道)、Exchange(交换机)、Queue(队列)、Virtual host(虚拟主机)这几个核心概念,这就是本篇要讲的内容。

介绍概念之前,有这么一张图,展示的是RabbitMq的工作模型,根据这行图来理解:


image.png

其中,中间的Broker表示RabbitMq服务,每个Broker里面至少有一个Virtual host虚拟主机,每个虚拟主机中有自己的Exchange交换机、Queue队列以及Exchange交换机与Queue队列之间的绑定关系Binding。producer(生产者)和consumer(消费者)通过与Broker建立Connection来保持连接,然后在Connection的基础上建立若干Channel信道,用来发送与接收消息。

Connection(连接)

每个producer(生产者)或者consumer(消费者)要通过RabbitMQ发送与消费消息,首先就要与RabbitMQ建立连接,这个连接就是Connection。Connection是一个TCP长连接。

Channel(信道)

Channel是在Connection的基础上建立的虚拟连接,RabbitMQ中大部分的操作都是使用Channel完成的,比如:声明Queue、声明Exchange、发布消息、消费消息等。
看到此处,你是否有这样一个疑问:既然已经有了Connection,我们完全可以使用Connection完成Channel的工作,为什么还要引入Channel这样一个虚拟连接的概念呢?因为现在的程序都是支持多线程的,如果没有Channel,那么每个线程在访问RabbitMQ时都要建立一个Connection这样的TCP连接,对于操作系统来说,建立和销毁TCP连接是非常大的开销,在系统访问流量高峰时,会严重影响系统性能。

Channel就是为了解决这种问题,通常情况下,每个线程创建单独的Channel进行通讯,每个Channel都有自己的channel id帮助Broker和客户端识别Channel,所以Channel之间是完全隔离的。

Connection与Channel之间的关系可以比作光纤电缆,如果把Connection比作一条光纤电缆,那么Channel就相当于是电缆中的一束光纤。

Virtual host(虚拟主机)

Virtual host是一个虚拟主机的概念,一个Broker中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名,不同的Virtual host中的Exchange和Queue名字可以一样。这样,不同的用户在访问同一个RabbitMQ Broker时,可以创建自己单独的Virtual host,然后在自己的Virtual host中创建Exchange和Queue,很好地做到了不同用户之间相互隔离的效果。


image.png
Queue(队列)

Queue是一个用来存放消息的队列,生产者发送的消息会被放到Queue中,消费者消费消息时也是从Queue中取走消息。
比如要发布游戏补丁到各个用户的手机app里,服务器就可以将补丁发到队列,等用户打开app后,app自动到队列拉取补丁下载,所以这个操作可以是异步的。

Exchange(交换机)

Exchange是一个比较重要的概念,它是消息到达RabbitMQ的第一站,主要负责根据不同的分发规则将消息分发到不同的Queue,供订阅了相关Queue的消费者消费到指定的消息。那Exchange有哪些分发消息的规则呢?这就要说到Exchange的4种类型了:direct、fanout、topic、headers。

在介绍这4种类型的Exchange之前,我们先来了解一下另外一个比较重要的概念:Routing key,翻译成中文就是路由键。当我们创建好Exchange和Queue之后,需要使用Routing key(通常叫作Binding key)将它们绑定起来,producer在向Exchange发送一条消息的时候,必须指定一个Routing key,然后Exchange接收到这条消息之后,会解析Routing key,然后根据Exchange和Queue的绑定规则,将消息分发到符合规则的Queue中。
Tips:交换机4种规则中,headers不是通过Routing key来匹配,而是通过Arguments来绑定,将在下面介绍。


image.png

接下来,我们根据上面的流程再来详细介绍下4种类型的Exchange。

1、direct

direct类型的Exchange会将消息转发到指定Routing key的Queue上,在direct中,Routing Key的解析规则为精确匹配。也就是只有当Producer发送的消息的Routing Key与某个Binding Key相等时,消息才会被分发到对应的Queue上。


image.png

创建三个队列

//订阅端代码
string exchangeName = "CzwTestExchange";
string queue1 = "CzwTestQueue1";
string queue2 = "CzwTestQueue2";
string queue3 = "CzwTestQueue3";
string queueRk1 = "key1";
string queueRk2 = "key2";
//创建唯一RabbitMq通道
var rabbitMqFactory = new ConnectionFactory()
{
    HostName = hostname,
    UserName = username,
    Password = password,
    Port = 5672,
    VirtualHost = virtualHost
};
// 创建连接
IConnection conn = rabbitMqFactory.CreateConnection(); 
// 连接的基础上可以创建多个信道
var channel = conn.CreateModel();
// 声明一个交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
// 声明三个队列
channel.QueueDeclare(queue1, durable: true, autoDelete: false, exclusive: false, arguments: null);
channel.QueueDeclare(queue2, durable: true, autoDelete: false, exclusive: false, arguments: null);
channel.QueueDeclare(queue3, durable: true, autoDelete: false, exclusive: false, arguments: null);
// 为队列绑定不同的routing key
channel.QueueBind(queue1,exchangeName,queueRk1);
channel.QueueBind(queue2, exchangeName, queueRk1);
channel.QueueBind(queue3, exchangeName, queueRk2);
//创建信道的消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (obj, e) =>
{
    var routingKey = e.RoutingKey;
    Console.WriteLine($"routingKey:{routingKey}");
};
channel.BasicConsume(queue1, autoAck: false, consumer: consumer);
channel.BasicConsume(queue2, autoAck: false, consumer: consumer);
channel.BasicConsume(queue3, autoAck: false, consumer: consumer);

现在有一个direct类型的Exchange,它下面绑定了三个Queue,Binding key分别是key1/key1/key2:


image.png
image.png
//发布端代码
string exchangeName = "CzwTestExchange";
//创建唯一RabbitMq通道
var rabbitMqFactory = new ConnectionFactory()
{
    HostName = hostname,
    UserName = username,
    Password = password,
    Port = 5672,
    VirtualHost = virtualHost
};
// 创建连接
IConnection conn = rabbitMqFactory.CreateConnection();
// 连接的基础上可以创建多个信道
var channel = conn.CreateModel();
// 声明一个交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

向该Exchange中发送一条消息,消息的Routing key是 key1

// 推送消息到交换机
// 消息对象json化
string json = JsonConvert.SerializeObject("新消息");
var msgBody = Encoding.UTF8.GetBytes(json);
channel.BasicPublish(exchange: exchangeName, routingKey: "key1", basicProperties: null, body: msgBody);

按照规则分析,CzwTestExchange交换机中绑定的三个队列中,CzwTestQueue1和CzwTestQueue2的Routing Key为 key1 ,所以这条消息应该被路由到CzwTestQueue1和CzwTestQueue2两个队列。消息发送成功之后,我们去Queues中查看,发现CzwTestQueue1和CzwTestQueue2这两个队列都接收到了一条消息。


image.png

进入这两个个队列,通过getMessage取出消息查看,确实是我们刚才手动发送的那条消息。


image.png

image.png

因此,如果再次发一条Routing Key 为 key2 的消息

// 推送消息到交换机
// 消息对象json化
string json = JsonConvert.SerializeObject("新消息2");
var msgBody = Encoding.UTF8.GetBytes(json);
//channel.BasicPublish(exchangeName, string.Empty, null, msgBody);
channel.BasicPublish(exchange: exchangeName, routingKey: "key2", basicProperties: null, body: msgBody);

根据规则,CzwTestQueue3的Banding Key 为key2,所以只有该队列收到了消息
发送成功后,查看CzwTestQueue3


image.png

再点开CzwTestQueue3 看看接收到的消息内容


image.png

所以,direct类型的Exchange在分发消息时,必须保证producer发送消息的Routing key与Exchange和Queue绑定的Binding key相等才可以。如果同一交换机里的多个队列中有多个队列有相同的Routing Key,那这些相同的队列会同时收到消息。
2、fanout

fanout是扇形的意思,该类型通常叫作广播类型。fanout类型的Exchange不处理Routing key,而是会将发送给它的消息路由到所有与它绑定的Queue上。


image.png

上面的订阅端代码和发布端代码都要将Direct换成Fanout,创建交换机和队列并绑定,如下

image.png

image.png

现在我们有一个fanout类型的Exchange,它下面绑定了三个Queue,Binding key分别是key1/key1/key2:
然后我们向该Exchange中发送一条消息,消息的Routing key随便填一个值abc:

// 声明一个交换机
channel.ExchangeDeclare(exchangeName, **ExchangeType.Fanout**, durable: true, autoDelete: false, arguments: null);
// 推送消息到交换机
// 消息对象json化
string json = JsonConvert.SerializeObject("新消息2");
var msgBody = Encoding.UTF8.GetBytes(json);
//channel.BasicPublish(exchangeName, string.Empty, null, msgBody);
channel.BasicPublish(exchange: exchangeName, routingKey: "abc", basicProperties: null, body: msgBody);

按照规则分析,这条消息应该被路由到所有与该Exchange绑定的Queue,即三个Queue都应该会受到消息。消息发送成功之后,我们去Queues中查看,发现确实每个QUEUE都接收到了一条消息。


image.png

进入这三个QUEUE,通过getMessage取出消息查看,确实是我们刚才代码发送的那条消息。


image.png

image.png

image.png

再发送一条Routing Key为 key1 的消息

// 推送消息到交换机
// 消息对象json化
string json = JsonConvert.SerializeObject("新消息2");
var msgBody = Encoding.UTF8.GetBytes(json);
//channel.BasicPublish(exchangeName, string.Empty, null, msgBody);
channel.BasicPublish(exchange: exchangeName, routingKey: "key1", basicProperties: null, body: msgBody);

发送消息成功后,查看队列


image.png

所有队列都收到了消息,点进去查看一下队列的消息


image.png

image.png

image.png

所以,fanout类型的Exchange不管Routing key是什么,它都会将接收到的消息分发给所有与自己绑定了的Queue上。

3、topic

topic的意思是主题,topic类型的Exchange会根据通配符对Routing key进行匹配,只要Routing key满足某个通配符的条件,就会被路由到对应的Queue上。通配符的匹配规则如下:

● Routing key必须是一串字符串,每个单词用“.”分隔;

● 符号“#”表示匹配一个或多个单词;

● 符号“*”表示匹配一个单词。

例如:“*.123” 能够匹配到 “abc.123”,但匹配不到 “abc.def.123”;“#.123” 既能够匹配到 “abc.123”,也能匹配到 “abc.def.123”。

创建Topic类型的交换机并绑定新的四个队列

//订阅端代码
string exchangeName = "CzwTestExchange";
string queue1 = "CzwTestQueue1";
string queue2 = "CzwTestQueue2";
string queue3 = "CzwTestQueue3";
string queue4 = "CzwTestQueue4";
string queueRk1 = "*.ORDER";
string queueRk2 = "GOODS.*";
string queueRk3 = "#.STOCK";
string queueRk4 = "USER.#";
//创建唯一RabbitMq通道
var rabbitMqFactory = new ConnectionFactory()
{
    HostName = hostname,
    UserName = username,
    Password = password,
    Port = 5672,
    VirtualHost = virtualHost
};
// 创建连接
IConnection conn = rabbitMqFactory.CreateConnection(); 
// 连接的基础上可以创建多个信道
var channel = conn.CreateModel();
// 声明一个交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
// 声明三个队列
channel.QueueDeclare(queue1, durable: true, autoDelete: false, exclusive: false, arguments: null);
channel.QueueDeclare(queue2, durable: true, autoDelete: false, exclusive: false, arguments: null);
channel.QueueDeclare(queue3, durable: true, autoDelete: false, exclusive: false, arguments: null);
channel.QueueDeclare(queue4, durable: true, autoDelete: false, exclusive: false, arguments: null);
// 为队列绑定不同的routing key
channel.QueueBind(queue1,exchangeName,queueRk1);
channel.QueueBind(queue2, exchangeName, queueRk2);
channel.QueueBind(queue3, exchangeName, queueRk3);
channel.QueueBind(queue4, exchangeName, queueRk4);
//创建信道的消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (obj, e) =>
{
    var routingKey = e.RoutingKey;
    Console.WriteLine($"routingKey:{routingKey}");
};
channel.BasicConsume(queue1, autoAck: false, consumer: consumer);
channel.BasicConsume(queue2, autoAck: false, consumer: consumer);
channel.BasicConsume(queue3, autoAck: false, consumer: consumer);
channel.BasicConsume(queue4, autoAck: false, consumer: consumer);

image.png

image.png

现在我们有一个topic类型的Exchange,它下面绑定了4个Queue,Binding key分别是.ORDER、GOODS.、#.STOCK、USER.#。

然后我们向该Exchange中发送一条消息,消息的Routing key为:USER.ABC.ORDER。

// 声明一个交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
// 推送消息到交换机
// 消息对象json化
string json = JsonConvert.SerializeObject("新消息");
var msgBody = Encoding.UTF8.GetBytes(json);
//channel.BasicPublish(exchangeName, string.Empty, null, msgBody);
channel.BasicPublish(exchange: exchangeName, routingKey: "USER.ABC.ORDER", basicProperties: null, body: msgBody);

按照规则分析,USER.ABC.ORDER这个Routing key只可以匹配到 “USER.#” ,所以,这条消息应该被路由到CzwTestExchange这个Queue中。消息发送成功之后,我们去Queues中查看,发现结果符合我们的预期。


image.png

进入这个QUEUE,通过getMessage取出消息查看,确实是我们刚才代码发送的那条消息。


image.png
4、headers

headers模式,Exchange与Queue之间的绑定不再通过Binding key绑定,而是通过Arguments绑定。
headers 匹配规则:any 、all
any: 只要在发布消息时携带的有一对键值对headers满足队列定义的多个参数的其中一个就能匹配上,注意这里是键值对的完全匹配,只匹配到键了,值却不一样是不行的;
all:在发布消息时携带的所有Entry必须和绑定在队列上的所有Entry完全匹配
缺点:Headers 类型的交换器性能会很差

**情况一:arguments中加x-match = all

string exchangeName = "CzwTestExchange";
string queue1 = "CzwTestQueue1";
string queue2 = "CzwTestQueue2";
string queue3 = "CzwTestQueue3";
//创建唯一RabbitMq通道
var rabbitMqFactory = new ConnectionFactory()
{
    HostName = hostname,
    UserName = username,
    Password = password,
    Port = 5672,
    VirtualHost = virtualHost
};
// 创建连接
IConnection conn = rabbitMqFactory.CreateConnection(); 
// 连接的基础上可以创建多个信道
var channel = conn.CreateModel();
// 声明一个交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
// 声明三个队列
channel.QueueDeclare(queue1, durable: true, autoDelete: false, exclusive: false, arguments: null);
channel.QueueDeclare(queue2, durable: true, autoDelete: false, exclusive: false, arguments: null);
channel.QueueDeclare(queue3, durable: true, autoDelete: false, exclusive: false, arguments: null);
var dic1 = new Dictionary<string, object>
{
    { "x-match","all" },
    { "Operate1","finish"}
};
var dic2 = new Dictionary<string, object>
{
    { "x-match","all" },
    { "Operate2","delete"}
};
var dic3 = new Dictionary<string, object>
{
    { "x-match","all" },
    { "Operate1","finish"},
    { "Operate2","delete"}
};
// 为队列绑定不同的routing key
channel.QueueBind(queue1,exchangeName,routingKey: string.Empty, arguments: dic1);
channel.QueueBind(queue2, exchangeName, routingKey: string.Empty, arguments: dic2);
channel.QueueBind(queue3, exchangeName, routingKey: string.Empty, arguments: dic3);
//创建信道的消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (obj, e) =>
{
    var routingKey = e.RoutingKey;
    Console.WriteLine($"routingKey:{routingKey}");
};
channel.BasicConsume(queue1, autoAck: false, consumer: consumer);
channel.BasicConsume(queue2, autoAck: false, consumer: consumer);
channel.BasicConsume(queue3, autoAck: false, consumer: consumer);


此时我们在交换机中创建了三个绑定了不同arguements的队列,如下


image.png

发布端代码

string exchangeName = "CzwTestExchange";
//创建唯一RabbitMq通道
var rabbitMqFactory = new ConnectionFactory()
{
    HostName = hostname,
    UserName = username,
    Password = password,
    Port = 5672,
    VirtualHost = virtualHost
};
// 创建连接
IConnection conn = rabbitMqFactory.CreateConnection();
// 连接的基础上可以创建多个信道
var channel = conn.CreateModel();

现在发送一条Operate1:finish的消息

// 声明一个交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
// 推送消息到交换机
// 消息对象json化
string json = JsonConvert.SerializeObject("新消息");
var msgBody = Encoding.UTF8.GetBytes(json);
//channel.BasicPublish(exchangeName, string.Empty, null, msgBody);
//设置通道属性
IBasicProperties basicProperties = channel.CreateBasicProperties();
var dicProp = new Dictionary<string, object>
{
    { "Operate1","finish"}
};
basicProperties.Headers = dicProp;
channel.BasicPublish(exchange: exchangeName, routingKey: string.Empty, basicProperties: basicProperties, body: msgBody);

根据规则


image.png

只带有参数{"Operate1","finish"}的情况下,只有CzwTestQueue1能收到消息
查看队列情况


image.png

果然只有CzwTestQueue1收到消息,点开看看消息内容


image.png

**情况二:arguments中加x-match = any
删除交换机和队列,重新按照上面的方式创建队列,将x-math 的值换成 any,如下


image.png

仍然发送一条Operate1:finish的消息
根据规则


image.png

此时CzwTestQueue1和CzwTestQueue3都应该收到消息,发送成功后查看队列


image.png

Nice~,点开get message一下


image.png

image.png

情况三:arguments中不带x-match

image.png

发送一条Operate1:finish的消息,发送成功后查看队列
image.png

只有队列1收到了,由此可知,交换机headers模式下,对于arguements不加x-match的情况,接收消息默认以x-match:all来匹配消息。
扩展
1、交换机类型为Headers模式下,一个队列还可以绑定多个arguements
image.png

此时只要发送的消息满足
{Operate1:finish}、
{Operate2:delete}、
{Operate1:finish,
Operate2:delete}
这三种情况的任一种,队列3都可以收到消息。
2、除交换机类型type外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

channel.ExchangeDeclare(exchangeName, ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);

exchange (交换机名称)
durable (消息代理服务重启后,交换机是否还存在)
autoDelete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
arguments(依赖代理本身)
交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。
然而并不是所有的应用场景都需要持久化的交换机。
默认交换机
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

举个栗子:当你声明了一个名为"search-indexing-online"的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为"search-indexing-online"。因此,当携带着名为"search-indexing-online"的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为"search-indexing-online"的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。

参考:https://baijiahao.baidu.com/s?id=1732891548341088166&wfr=spider&for=pc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,042评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,996评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,674评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,340评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,404评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,749评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,902评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,662评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,110评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,451评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,577评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,258评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,848评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,726评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,952评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,271评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,452评论 2 348

推荐阅读更多精彩内容