rabbitmq指南

rabbitmq 简介

RabbitMQ 是一个用 erlang 开发的 AMQP(Advanced Message Queue)的开源实现,AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用和消息中间代理之间进行通讯。

AMQP 模型简介

image

消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

AMQP 是一个可编的程协议

AMQP 是一个可编程协议,某种意义上说 AMQP 的实体和路由规则是由应用本身定义的,而不是由消息代理定义。包括像声明队列和交换机,定义他们之间的绑定,订阅队列等等关于协议本身的操作。

这虽然能让开发人员自由发挥,但也需要他们注意潜在的定义冲突。当然这在实践中很少会发生,如果发生,会以配置错误(misconfiguration)的形式表现出来。

应用程序(Applications)声明AMQP实体,定义需要的路由方案,或者删除不再需要的AMQP实体。

Exchange

交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 的代理提供了四种交换机。

Name(交换机类型) Default pre-declared names(预声明的默认名称)
Direct exchange(直连交换机) (Empty string) and amq.direct
Fanout exchange(扇型交换机) amq.fanout
Topic exchange(主题交换机) amq.topic
Headers exchange(头交换机) amq.match (and amq.headers in RabbitMQ)

下列示例代码可参考官方文档:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

默认 exchanage

exchanage 可以使用空字符串代替,消息会根据指定的 routing_key 分发到与 routing_key 同名的队列。

image

消息发送端 python 代码:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = "Hello World!"

count = 1

while True:
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=str(count) + message,
                          properties=pika.BasicProperties(
                              delivery_mode=2
                          ))

    count += 1
    print("[x] Send %d %r" % (count, message))
    time.sleep(1)

消息接收端 python 代码:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print('[*] Waiting for message. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("[x] Worker1 Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(5)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

Direct exchange(直连交换机)

消息发送到与 exchange 绑定的 queue 上,且 routing_key 必须精确匹配。

image

发布端 python 代码:

import pika
import time
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 定义一个 Direct 类型的 exchanage
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 发送消息给 exchange,如果 queue 不存在消息会丢失
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

print ("[x] Sent %r:%r" % (severity, message))
connection.close()

接受端 python 代码:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

channel.queue_declare(queue='aaaa', durable=True)


severities = sys.argv[1:]
if not severities:
    sys.exit(1)

for severity in severities:
    # exchange 跟 queue 绑定,且 routing_key 必须精确匹配才能接收
    channel.queue_bind(exchange='direct_logs',
                       queue='aaaa',
                       routing_key=severity)

print (' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):

    print (" [x] %r:%r" % (method.routing_key, body,))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                      queue='aaaa')

channel.start_consuming()

Fanout exchange(扇型交换机)

将消息发送给绑定到 exchange 上的所有 queue。就是发布/订阅模式。

image

发布端 python 代码:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 定义一个 faout 类型的 exchanage
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

count = 1

message = 'Hello World!'

while True:
    # faout 会忽略 routing_key,所以这里为空
    channel.basic_publish(
        exchange='logs',
        routing_key='',
        body=str(count) + message
    )

    count += 1

    print("[x] Sent %d %r" %(count, message))

    time.sleep(3)

接收端 python 代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# exchange 与 queue 绑定
channel.queue_bind(exchange='logs',
                   queue=queue_name)

def callback(ch, method, properties, body):
    print("[x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

Topic exchange(主题交换机)

与 Direct exchange 类似,routing_key 可使用通配符匹配。

image

安装向导

这里直接使用二进制包的方式安装:https://www.rabbitmq.com/install-generic-unix.html

  1. 系统需要安装 Erlang
  2. 内核参数系统 limits 调整。
    • 设置打开文件最大数,推荐至少 65536
  3. 下载二进制包 rabbitmq-server-generic-unix-3.7.8.tar.xz
  4. 解压至 /user/local 并将 sbin 目录添加至 $PATH 中。
  5. 启动,默认的数据目录在 ./var 下,sbin/rabbitmq-server 或者 sbin/rabbitmq-server -detached 后台运行。
  6. 停止,sbin/rabbitmqctl shutdown
  7. 配置文件 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf
  8. 开启 WEB UI rabbitmq-plugins enable rabbitmq_management

rabbitmq 相关 文件&目录 路径

https://www.rabbitmq.com/relocate.html

你可以通过环境变量来设置 rmq 相关文件或目录的位置,但是大多数情况下使用默认的即可。

Deb/RPM 包安装的情况下(${install_prefix} 为空)

Name Location
RABBITMQ_BASE (Not used - Windows only)
RABBITMQ_CONFIG_FILE ${install_prefix}/etc/rabbitmq/rabbitmq
RABBITMQ_MNESIA_BASE ${install_prefix}/var/lib/rabbitmq/mnesia
RABBITMQ_MNESIA_DIR RABBITMQ_MNESIA_BASE/RABBITMQ_NODENAME
RABBITMQ_LOG_BASE ${install_prefix}/var/log/rabbitmq
RABBITMQ_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME.log
RABBITMQ_SASL_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME-sasl.log
RABBITMQ_PLUGINS_DIR /usr/lib/rabbitmq/plugins:$RABBITMQ_HOME/plugins
RABBITMQ_ENABLED_PLUGINS_FILE ${install_prefix}/etc/rabbitmq/enabled_plugins
RABBITMQ_PID_FILE $RABBITMQ_MNESIA_DIR.pid

二进制包安装的情况 (${RABBITMQ_HOME} 是指二进制包解压的目录)

Name Location
RABBITMQ_BASE (Not used)
RABBITMQ_CONFIG_FILE $RABBITMQ_HOME/etc/rabbitmq/rabbitmq
RABBITMQ_MNESIA_BASE $RABBITMQ_HOME/var/lib/rabbitmq/mnesia
RABBITMQ_MNESIA_DIR RABBITMQ_MNESIA_BASE/RABBITMQ_NODENAME
RABBITMQ_LOG_BASE $RABBITMQ_HOME/var/log/rabbitmq
RABBITMQ_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME.log
RABBITMQ_SASL_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME-sasl.log
RABBITMQ_PLUGINS_DIR $RABBITMQ_HOME/plugins
RABBITMQ_ENABLED_PLUGINS_FILE $RABBITMQ_HOME/etc/rabbitmq/enabled_plugins
RABBITMQ_PID_FILE $RABBITMQ_MNESIA_DIR.pid

备份&恢复

https://www.rabbitmq.com/backup.html#rabbitmq-definitions

rmq 中有两种类型的数据:

  • definitions (metadata, schema/topology)【Users, vhosts, queues, exchanges, bindings, runtime parameters all fall into this category.】
  • message 数据

exporting definitions

有两种方式:

  • rabbitmqadmin export rabbit.definitions.json (# => Exported configuration for localhost to "rabbit.config")
  • GET /api/definitions,需要开启 rabbitmq_management 插件。

import definitions

有两种方式:

  • rabbitmqadmin -q import rabbit.definitions.json
  • POST /api/definitions

手动备份

  1. 通过 rabbitmqctl eval 'rabbit_mnesia:dir().' 查出数据目录并备份,如果是备份 message 数据,则需要将 node 停止,如果集群中队列是 mirror 则需要将整个集群停止。
  2. (可选)如果 node 的名字改变了需要使用 rabbitmqctl rename_cluster_node <oldnode> <newnode>

手动恢复

  1. 将上诉备份的目录拷贝至相应的目录。

集群

https://www.rabbitmq.com/clustering.html

Virtual hosts, exchanges, users, and permissions are automatically mirrored across all nodes in a cluster. Queues may be located on a single node, or mirrored across multiple nodes. A client connecting to any node in a cluster can see all queues in the cluster, even if they are not located on that node.

通常一些分布式系统会有 master 跟 node 节点,但是 rabbitmq 并不是这样。在集群中所有节点都是平等的 (equal peer) 。集群中的节点使用 /var/lib/rabbitmq/.erlang.cookie 来允许它们彼此之间通讯,改文件权限必须是 600

前提

  1. 集群中的各个节点使用域名通讯,确保各个节点之间的 hostname 都能够解析。

  2. 以下端口确保互通:

    • 4369: epmd, a peer discovery service used by RabbitMQ nodes and CLI tools
    • 5672, 5671: used by AMQP 0-9-1 and 1.0 clients without and with TLS
    • 25672: used for inter-node and CLI tools communication (Erlang distribution server port) and is allocated from a dynamic range (limited to a single port by default, computed as AMQP port + 20000). Unless external connections on these ports are really necessary (e.g. the cluster uses federation or CLI tools are used on machines outside the subnet), these ports should not be publicly exposed. See networking guide for details.
    • 35672-35682: used by CLI tools (Erlang distribution client ports) for communication with nodes and is allocated from a dynamic range (computed as server distribution port + 10000 through server distribution port + 10010). See networking guide for details.
    • 15672: HTTP API clients, management UI and rabbitmqadmin (only if the management plugin is enabled)
    • 61613, 61614: STOMP clients without and with TLS (only if the STOMP plugin is enabled)
    • 1883, 8883: (MQTT clients without and with TLS, if the MQTT plugin is enabled
    • 15674: STOMP-over-WebSockets clients (only if the Web STOMP plugin is enabled)
    • 15675: MQTT-over-WebSockets clients (only if the Web MQTT plugin is enabled)

手动创建集群

三个节点名字为 rabbit1 rabbit2 rabbit3

  1. 分别启动三台 rabbitmq

    rabbit1$ rabbitmq-server -detached
    rabbit2$ rabbitmq-server -detached
    rabbit3$ rabbitmq-server -detached
    
  2. 将其他两个节点(如:rabbit2 rabbit3)加入 rabbit1 组成集群

    # rabbit2 节点操作
    rabbit2$ rabbitmqctl stop_app
    Stopping node rabbit@rabbit2 ...done.
    
    rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
    Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
    
    rabbit2$ rabbitmqctl start_app
    Starting node rabbit@rabbit2 ...done.
    
    # rabbit3 节点操作
    rabbit3$ rabbitmqctl stop_app
    Stopping node rabbit@rabbit3 ...done.
    
    rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
    Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
    
    rabbit3$ rabbitmqctl start_app
    Starting node rabbit@rabbit3 ...done.
    
  3. 查看集群状态

    rabbit1$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit1 ...
    [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
     {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]
    ...done.
    
  4. 重启节点,已加入集群的节点可以任意重启,当节点恢复后它就会从其他节点同步数据。

    • 当一个节点重启时,他会联系对等体 10 次,每次超时 30s,如果通讯成功,则启动成功,并且同步对等体数据。
    • 当一个节点关闭时没有其他的对等体了(最后一个关闭的节点),它启动的时候不会作为一个独立节点,它将等待对等体加入。
    • 当所有节点关闭后,集群也就关闭了。当再次启动一个节点,在指定的时间内(默认为 5min, 可以通过下面的配置文件修改),其他节点再次启动就会自动加入原来的集群。
      # wait for 60 seconds instead of 30
      mnesia_table_loading_retry_timeout = 60000
      
      # retry 15 times instead of 10
      mnesia_table_loading_retry_limit = 15
      

rabbitmq-peer-discovery-k8s 插件自动创建集群

https://github.com/rabbitmq/rabbitmq-peer-discovery-k8s

可以使用 helm 的 rabbitmq-ha chart 在 kubernetes 集群中快速部署一套 rabbitmq cluster.

参考

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容

  • 什么叫消息队列? 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复...
    Agile_dev阅读 2,373评论 0 24
  • 本文大纲 RabbitMQ 历史 RabbitMQ 应用场景 RabbitMQ 系统架构 RabbitMQ 基本概...
    Java_Explorer阅读 16,372评论 1 40
  • 整体架构 部署步骤 基于 Docker 基本概念内存节点只保存状态到内存,例外情况是:持久的 queue 的内容将...
    mvictor阅读 12,756评论 5 30
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,657评论 18 139
  • RabbitMQ采用Erlang编写,需安装语言库才能运行RabbitMQ代理服务器。AMQP:高级消息队列协议。...
    JAVA觅音阁阅读 3,642评论 0 7