本文所有内容均个人从RabbitMQ官网教程中翻译,若图片文字的引用有任何侵权的地方,联系我,我会立马删除。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
话题
(使用php-amqplib)
在上一个教程我们升级了我们的日志系统。与仅仅能够进行虚拟广播的fanout
类型交换机不同,我们使用了一个direct
类型交换机,并获得了可以选择性地接收日志的能力。
尽管使用direct
交换机提升我们的系统,但它仍然是由限制的——它不能够在多个标准下进行路由。
在我们的日志系统中,我们可能希望不仅仅是根据日志等级来订阅日志,也可以根据发送日志的源。你可能已经从可以同时根据日志的严重性(info
/warn
/crit
....)与设施(auth
/cron
/kern
...)来路由日志的Unixsyslog
工具知道这一概念。
这会带给我们非常多的灵活性——我们可能打算仅仅监听来自cron
的严重错误日志而又监听来自kern
的所有日志。
为了在我们的日志系统实现这一公功能,我们需要学习一个更加复杂的topic
交换机
Topic交换机
消息发送到一个topic
交换机不能随意设置routing_key
——它必须是一系列以点分隔的词。这些词可以是随意的,但它们通常指定了消息的一些特性。有一些有效的路由键(routing key)的例子:'stock.usd.nyse'
,nyse.vmw
,quick.orange.rabbit
。他们可以由很多你喜欢的路由键组成,最多不能超过255个字节(bytes)。
绑定键(binding key)也必须是同样的格式。topic
交换机的逻辑与direct
交换机是相似的——一条携带特定路由键(routing key)发送的消息将会被分发到所有以与之匹配的绑定键(binding key)绑定的Queue(队列)。然而,关于绑定键,有两个需要特别注意的情况:
- *(星号)可以代替(任意)一个词。
-
(井号)可以代替零个或者更多的词
这是一个最简单的解释例子:
在这一个例子中,我们准备发送一些描述动物的消息。这些消息被发送时将会带有由三个单词组成(两个点)的路由键(routing key)。路由键(routing key)的第一个词将会描述速度,第二个描述颜色,第三个描述物种:'<speed>.<colour>.<species>'
。
我们创建了三个绑定:Q1
队列以'*.orange.*
路由键绑定,Q2
以'*.*.rabbit
与lazy.#
(两个路由键)绑定。
这些绑定可以概括为:
Q1
对所有orange(橙色)的动物感兴趣。
Q2
想监听rabbit(兔子)的所有消息与lazy(慢吞吞的)动物的所有消息。
一条路由键(routing key)被设置为'quick.orange.rabbit
的消息将会被交付到这两个队列。路由键为'lazy.orange.elephant'
的消息也会被交付到这两个队列。而另一种路由键为'quick.orange.fox'
的消息只会去到第一个(Q1
)队列,以及'lazy.brown.fox'
只会发送到第二个(Q2
)队列。路由键为lazy.pink.rabbit
的消息只会被发送到第二个(Q2
)队列一次,尽管它符合两个绑定。'quick.brown.fox'
不匹配任何一个绑定,所以它会被丢弃。
如果我们打破我们的规定,发送了一条只带有一个词或者带有四个词的路由键的消息,例如'orange'
或者'quick.orange.male.rabbit'
,会发生什么呢?这些消息将不会匹配到任何绑定上并且将会被丢失。
在另一方面,尽管'lazy.orange.male.rabbit'
拥有四个词,它将会匹配最后一个绑定(lazy.#
),并将会发送到第二个队列。
话题交换机(Topic Exchange)
话题交换机是十分强大的,同时它能表现得想其他的交换机一样。
当一个Queue(队列)以"#"
(井号)绑定键(binding key)绑定——它将会忽视路由键(routing key)去接收所有消息就像是一个fanout
类型的交换机一样。当特殊字符
"*"
(星号)与"#"
(井号)没有被用在绑定(键)上,这时候topic
交换机由表现得像direct
交换机一样。
把他们一起运行
我们准备在我们的日志系统中使用一个topic
交换机。我们将从一个工作假设开始,即日志的路由键将有两个单词:"<facility>.<severity>"
。
代码与上一个教程的十分相似,
emit_log_topic.php
的代码如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明一个topic交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
// 从命令行获取路由键
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
// 携带路由键发送到topic_logs交换机
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo " [x] Sent ",$routing_key,':',$data," \n";
$channel->close();
$connection->close();
?>
receive_logs_topic.php
的代码如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明topic交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
// 获取非持久化队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
// 绑定多个绑定键到队列
foreach($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
如果想接收所有日志:
php receive_logs_topic.php "#"
接收"kern"设备的所有日志:
php receive_logs_topice.php "kern.*"
或者你只想监听"critical"
的日志:
php receive_logs_topic.php "*.critical"
你可以创建多个绑定:
php receive_logs_topic.php "kern.*" "*.critical"
以及,发送一个带有"kern.critical"
路由键的日志请输入:
php emit_log_topic.php "kern.critical" "A critical kernel error"
用这些程序愉快地玩耍吧。要主意的是,这些代码是没有对路由或绑定键做任何的假设,你可能希望使用两个两个以上的路由键参数。