rabbitmq 简介
RabbitMQ 是一个用 erlang 开发的 AMQP(Advanced Message Queue)的开源实现,AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用和消息中间代理之间进行通讯。
AMQP 模型简介
消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
AMQP 是一个可编的程协议
AMQP 是一个可编程协议,某种意义上说 AMQP 的实体和路由规则是由应用本身定义的,而不是由消息代理定义。包括像声明队列和交换机,定义他们之间的绑定,订阅队列等等关于协议本身的操作。
这虽然能让开发人员自由发挥,但也需要他们注意潜在的定义冲突。当然这在实践中很少会发生,如果发生,会以配置错误(misconfiguration)的形式表现出来。
应用程序(Applications)声明AMQP实体,定义需要的路由方案,或者删除不再需要的AMQP实体。
Exchange
交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 的代理提供了四种交换机。
Name(交换机类型) | Default pre-declared names(预声明的默认名称) |
---|---|
Direct exchange(直连交换机) | (Empty string) and amq.direct |
Fanout exchange(扇型交换机) | amq.fanout |
Topic exchange(主题交换机) | amq.topic |
Headers exchange(头交换机) | amq.match (and amq.headers in RabbitMQ) |
下列示例代码可参考官方文档:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
默认 exchanage
exchanage 可以使用空字符串代替,消息会根据指定的 routing_key
分发到与 routing_key
同名的队列。
消息发送端 python 代码:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = "Hello World!"
count = 1
while True:
channel.basic_publish(exchange='',
routing_key='task_queue',
body=str(count) + message,
properties=pika.BasicProperties(
delivery_mode=2
))
count += 1
print("[x] Send %d %r" % (count, message))
time.sleep(1)
消息接收端 python 代码:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print('[*] Waiting for message. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("[x] Worker1 Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
time.sleep(5)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
Direct exchange(直连交换机)
消息发送到与 exchange 绑定的 queue 上,且 routing_key
必须精确匹配。
发布端 python 代码:
import pika
import time
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 定义一个 Direct 类型的 exchanage
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
# 发送消息给 exchange,如果 queue 不存在消息会丢失
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print ("[x] Sent %r:%r" % (severity, message))
connection.close()
接受端 python 代码:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
channel.queue_declare(queue='aaaa', durable=True)
severities = sys.argv[1:]
if not severities:
sys.exit(1)
for severity in severities:
# exchange 跟 queue 绑定,且 routing_key 必须精确匹配才能接收
channel.queue_bind(exchange='direct_logs',
queue='aaaa',
routing_key=severity)
print (' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print (" [x] %r:%r" % (method.routing_key, body,))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback,
queue='aaaa')
channel.start_consuming()
Fanout exchange(扇型交换机)
将消息发送给绑定到 exchange 上的所有 queue。就是发布/订阅模式。
发布端 python 代码:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 定义一个 faout 类型的 exchanage
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
count = 1
message = 'Hello World!'
while True:
# faout 会忽略 routing_key,所以这里为空
channel.basic_publish(
exchange='logs',
routing_key='',
body=str(count) + message
)
count += 1
print("[x] Sent %d %r" %(count, message))
time.sleep(3)
接收端 python 代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# exchange 与 queue 绑定
channel.queue_bind(exchange='logs',
queue=queue_name)
def callback(ch, method, properties, body):
print("[x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
Topic exchange(主题交换机)
与 Direct exchange 类似,routing_key
可使用通配符匹配。
安装向导
这里直接使用二进制包的方式安装:https://www.rabbitmq.com/install-generic-unix.html
- 系统需要安装 Erlang 。
- 内核参数系统 limits 调整。
- 设置打开文件最大数,推荐至少 65536
- 下载二进制包 rabbitmq-server-generic-unix-3.7.8.tar.xz
- 解压至
/user/local
并将sbin
目录添加至$PATH
中。 - 启动,默认的数据目录在
./var
下,sbin/rabbitmq-server
或者sbin/rabbitmq-server -detached
后台运行。 - 停止,
sbin/rabbitmqctl shutdown
。 - 配置文件
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf
。 - 开启 WEB UI
rabbitmq-plugins enable rabbitmq_management
。
rabbitmq 相关 文件&目录 路径
你可以通过环境变量来设置 rmq 相关文件或目录的位置,但是大多数情况下使用默认的即可。
Deb/RPM 包安装的情况下(${install_prefix}
为空)
Name | Location |
---|---|
RABBITMQ_BASE | (Not used - Windows only) |
RABBITMQ_CONFIG_FILE | ${install_prefix}/etc/rabbitmq/rabbitmq |
RABBITMQ_MNESIA_BASE | ${install_prefix}/var/lib/rabbitmq/mnesia |
RABBITMQ_MNESIA_DIR |
|
RABBITMQ_LOG_BASE | ${install_prefix}/var/log/rabbitmq |
RABBITMQ_LOGS |
|
RABBITMQ_SASL_LOGS |
|
RABBITMQ_PLUGINS_DIR | /usr/lib/rabbitmq/plugins:$RABBITMQ_HOME/plugins |
RABBITMQ_ENABLED_PLUGINS_FILE | ${install_prefix}/etc/rabbitmq/enabled_plugins |
RABBITMQ_PID_FILE | $RABBITMQ_MNESIA_DIR.pid |
二进制包安装的情况 (${RABBITMQ_HOME}
是指二进制包解压的目录)
Name | Location |
---|---|
RABBITMQ_BASE | (Not used) |
RABBITMQ_CONFIG_FILE | $RABBITMQ_HOME/etc/rabbitmq/rabbitmq |
RABBITMQ_MNESIA_BASE | $RABBITMQ_HOME/var/lib/rabbitmq/mnesia |
RABBITMQ_MNESIA_DIR |
|
RABBITMQ_LOG_BASE | $RABBITMQ_HOME/var/log/rabbitmq |
RABBITMQ_LOGS |
|
RABBITMQ_SASL_LOGS |
|
RABBITMQ_PLUGINS_DIR | $RABBITMQ_HOME/plugins |
RABBITMQ_ENABLED_PLUGINS_FILE | $RABBITMQ_HOME/etc/rabbitmq/enabled_plugins |
RABBITMQ_PID_FILE | $RABBITMQ_MNESIA_DIR.pid |
备份&恢复
https://www.rabbitmq.com/backup.html#rabbitmq-definitions
rmq 中有两种类型的数据:
- definitions (metadata, schema/topology)【Users, vhosts, queues, exchanges, bindings, runtime parameters all fall into this category.】
- message 数据
exporting definitions
有两种方式:
-
rabbitmqadmin export rabbit.definitions.json
(# => Exported configuration for localhost to "rabbit.config") -
GET /api/definitions
,需要开启 rabbitmq_management 插件。
import definitions
有两种方式:
rabbitmqadmin -q import rabbit.definitions.json
POST /api/definitions
手动备份
- 通过
rabbitmqctl eval 'rabbit_mnesia:dir().'
查出数据目录并备份,如果是备份 message 数据,则需要将 node 停止,如果集群中队列是 mirror 则需要将整个集群停止。 - (可选)如果 node 的名字改变了需要使用
rabbitmqctl rename_cluster_node <oldnode> <newnode>
手动恢复
- 将上诉备份的目录拷贝至相应的目录。
集群
https://www.rabbitmq.com/clustering.html
Virtual hosts, exchanges, users, and permissions are automatically mirrored across all nodes in a cluster. Queues may be located on a single node, or mirrored across multiple nodes. A client connecting to any node in a cluster can see all queues in the cluster, even if they are not located on that node.
通常一些分布式系统会有 master 跟 node 节点,但是 rabbitmq 并不是这样。在集群中所有节点都是平等的 (equal peer) 。集群中的节点使用 /var/lib/rabbitmq/.erlang.cookie
来允许它们彼此之间通讯,改文件权限必须是 600
,
前提
集群中的各个节点使用域名通讯,确保各个节点之间的
hostname
都能够解析。-
以下端口确保互通:
- 4369: epmd, a peer discovery service used by RabbitMQ nodes and CLI tools
- 5672, 5671: used by AMQP 0-9-1 and 1.0 clients without and with TLS
- 25672: used for inter-node and CLI tools communication (Erlang distribution server port) and is allocated from a dynamic range (limited to a single port by default, computed as AMQP port + 20000). Unless external connections on these ports are really necessary (e.g. the cluster uses federation or CLI tools are used on machines outside the subnet), these ports should not be publicly exposed. See networking guide for details.
- 35672-35682: used by CLI tools (Erlang distribution client ports) for communication with nodes and is allocated from a dynamic range (computed as server distribution port + 10000 through server distribution port + 10010). See networking guide for details.
- 15672: HTTP API clients, management UI and rabbitmqadmin (only if the management plugin is enabled)
- 61613, 61614: STOMP clients without and with TLS (only if the STOMP plugin is enabled)
- 1883, 8883: (MQTT clients without and with TLS, if the MQTT plugin is enabled
- 15674: STOMP-over-WebSockets clients (only if the Web STOMP plugin is enabled)
- 15675: MQTT-over-WebSockets clients (only if the Web MQTT plugin is enabled)
手动创建集群
三个节点名字为 rabbit1
rabbit2
rabbit3
-
分别启动三台 rabbitmq
rabbit1$ rabbitmq-server -detached rabbit2$ rabbitmq-server -detached rabbit3$ rabbitmq-server -detached
-
将其他两个节点(如:
rabbit2
rabbit3
)加入rabbit1
组成集群# rabbit2 节点操作 rabbit2$ rabbitmqctl stop_app Stopping node rabbit@rabbit2 ...done. rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1 Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done. rabbit2$ rabbitmqctl start_app Starting node rabbit@rabbit2 ...done. # rabbit3 节点操作 rabbit3$ rabbitmqctl stop_app Stopping node rabbit@rabbit3 ...done. rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2 Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done. rabbit3$ rabbitmqctl start_app Starting node rabbit@rabbit3 ...done.
-
查看集群状态
rabbit1$ rabbitmqctl cluster_status Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}] ...done.
-
重启节点,已加入集群的节点可以任意重启,当节点恢复后它就会从其他节点同步数据。
- 当一个节点重启时,他会联系对等体 10 次,每次超时 30s,如果通讯成功,则启动成功,并且同步对等体数据。
- 当一个节点关闭时没有其他的对等体了(最后一个关闭的节点),它启动的时候不会作为一个独立节点,它将等待对等体加入。
- 当所有节点关闭后,集群也就关闭了。当再次启动一个节点,在指定的时间内(默认为 5min, 可以通过下面的配置文件修改),其他节点再次启动就会自动加入原来的集群。
# wait for 60 seconds instead of 30 mnesia_table_loading_retry_timeout = 60000 # retry 15 times instead of 10 mnesia_table_loading_retry_limit = 15
rabbitmq-peer-discovery-k8s 插件自动创建集群
https://github.com/rabbitmq/rabbitmq-peer-discovery-k8s
可以使用 helm 的 rabbitmq-ha chart 在 kubernetes 集群中快速部署一套 rabbitmq cluster.