RabbitMQ 是一个开源(遵循 MPL 协议)的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ 是使用 Erlang 语言编写,并且基于 AMQP 协议实现。
AMQP (Advanced Message Queuing Protocol)高级消息队列协议。AMQP是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。AMQP 包括 Exchange 和 Binging 的角色,生产者(Producer)把消息发布到 Exchange 上,消息最终到达队列并被消费者(Consumer)接收,而 Binding 决定交换器的消息应该发送到哪个队列。
Keepalived 是一款基于 VRRP 协议的轻量级服务高可用和负载均衡方案,提供避免服务器单点故障和请求分流的能力。
本方案基于CentOS8系统设计,建议在RedHat/CentOS系统中使用。部署数据库集群使用服务器及网络资源较多,建议在实施前做好规划工作,有利于部署工作顺利、有序进行。
目录
1.RabbitMQ 技术评估
2.RabbitMQ 单点安装和配置
3.RabbitMQ 普通集群
-- 3.1 集群部署拓扑图
-- 3.2 普通集群部署
4.RabbitMQ 镜像集群
-- 4.1 集群部署拓扑图
-- 4.2 镜像集群部署
5.RabbitMQ 镜像集群+ Keepalived 高可用
-- 5.1 集群部署拓扑图
-- 5.2 高可用镜像集群部署
6.RabbitMQ 运维管理
-- 6.1.通过可视化插件运维管理
-- 6.2.通过命令管道运维管理
7.Java 开发集成示例
1. RabbitMQ 技术评估
1、技术特点
可靠性:使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
灵活的路由:在消息进入队列之前,通过 Exchange 来路由消息。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange。
消息集群:多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker。
高可用:队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
多种协议:支持多种消息队列协议,如 STOMP、MQTT 等。
多种语言客户端:几乎支持所有常用语言,比如 Java、.NET、Ruby 等。
管理界面:提供了易用的用户界面,用户可以监控和管理消息 Broker 的许多方面。
跟踪机制:如果消息异常,RabbitMQ 提供了消息的跟踪机制。
插件机制:提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。
2、组件结构
Broker:标识消息队列服务器实体。
Virtual Host:虚拟主机,标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个的 RabbitMQ 服务器的实例,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是【/】。
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟链接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Connection:网络连接,比如一个TCP连接。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括 【routing-key(路由键)】、【priority(优先级)】、【delivery-mode(路由模式)】等。
3、Exchange (交换器)的类型
Exchange 分发消息时,根据类型的不同分发策略有区别。常用类型包括 direct、fanout、topic。
1)direct:
Message 中的 "路由键(routing-key)" 如果和 Banding 中的 "绑定键(binding-key)" 一致,direct 类型交换器就将消息发到 Banding 对应的 Queue 中。如下图:
2)fanout:
Message 都会分到所有 Banding 对应的 Queue 中,就像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型的交换器转发消息是最快的。
3)topic:
交换器通过模式匹配将 Message 中的 "路由键(routing-key)" 如果和 Banding 中的 "绑定键(binding-key)" 字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:【 # 】和 【 * 】。【 # 】匹配 0 个或多个单词,【 * 】匹配一个单词。topic 类型交换器就将消息发到 Banding 对应的 Queue 中。
4、消息处理流程
5、应用场景
1)异步处理
场景:用户注册后,需要发注册邮件和注册短信。
传统的做法有两种:
-
串行的方式
将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是邮件、短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。
并行的方式
将注册信息写入数据库后,发送邮件的同时发送短信,以上三个任务完成后返回给客户端,并行的方式能提高处理的时间。
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回。
消息队列方式:
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理。
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),
引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。
2)应用解耦
场景:用户下单后,订单系统需要通知库存系统。
传统的做法就是订单系统调用库存系统的接口:
这种做法有一个缺点:当库存系统出现故障时,订单就会失败,订单系统和库存系统高耦合。
消息队列方式:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息进行库操作。
就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
3)流量削峰
场景: 因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
- 可以控制活动人数,超过此一定阀值的订单直接丢弃。
-
可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)。
用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。业务根据消息队列中的请求信息,再做后续处理。
2.RabbitMQ 单点安装和配置
1、安装 EPEL 的 Yum 源。RabbitMQ 依赖于 EPEL 中的 Erlang 平台运行。
使用文本编辑器创建仓库配置文件:
[centos@host ~ ]$ sudo gedit /etc/yum.repos.d/epel.repo
在文件中编写以下内容并保存:
[epel-modular]
name=Extra Packages for Enterprise Linux Modular $releasever - $basearch
baseurl=http://mirrors.aliyun.com/epel/$releasever/Modular/$basearch
enabled=1
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/epel/RPM-GPG-KEY-EPEL-8
[epel]
name=Extra Packages for Enterprise Linux $releasever - $basearch
baseurl=http://mirrors.aliyun.com/epel/$releasever/Everything/$basearch
enabled=1
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/epel/RPM-GPG-KEY-EPEL-8
更新 Yum 源:
[centos@host ~]$ sudo dnf clean all
[centos@host ~]$ sudo dnf makecache
Extra Packages for Enterprise Linux Modular 8 - 429 kB/s | 118 kB 00:00
Extra Packages for Enterprise Linux 8 - x86_64 3.7 MB/s | 6.9 MB 00:01
元数据缓存已建立。
EPEL(Extra Packages for Enterprise Linux)是企业级 Linux 操作系统的扩展包仓库,为 Redhat/CentOS系统提供大量的额外软件包。
2、安装 RabbitMQ 的 Yum 源。
使用文本编辑器创建仓库配置文件:
[centos@host ~]$ sudo gedit /etc/yum.repos.d/rabbitmq.repo
在文件中编写以下内容并保存:
[rabbitmq-server]
name=RabbitMQ Server for Redhat Linux $releasever
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/$releasever/
gpgcheck=0
repo_gpgcheck=0
enabled=1
gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
更新 Yum 源:
[centos@host ~]$ sudo dnf clean all
[centos@host ~]$ sudo dnf makecache
RabbitMQ Server 1.4 kB/s | 12 kB 00:09
元数据缓存已建立。
3、安装 RabbitMQ Server。
[centos@host ~]$ sudo dnf install rabbitmq-server
程序指令目录是"/usr/sbin";
程序安装目录是"/usr/lib/rabbitmq";
服务器程序是"/usr/bin/rabbitmq-server";
服务器控制程序是"/usr/bin/rabbitmqctl";
程序运行用户和组是"rabbitmq:rabbitmq","rabbitmq"用户和组安装时默认创建。
4、配置 RabbitMQ 服务开机自启动。
使用文本编辑器打开"/usr/lib/systemd/system/rabbitmq-server.service"文件:
[centos@host ~]$ sudo gedit /usr/lib/systemd/system/rabbitmq-server.service
验证或修改文件内容并保存:
[Unit]
Description=RabbitMQ broker
After=syslog.target network.target
[Service]
Type=notify
User=rabbitmq
Group=rabbitmq
UMask=0027
NotifyAccess=all
TimeoutStartSec=600
# To override LimitNOFILE, create the following file:
#
# /etc/systemd/system/rabbitmq-server.service.d/limits.conf
#
# with the following content:
#
# [Service]
# LimitNOFILE=65536
LimitNOFILE=32768
# Note: systemd on CentOS 7 complains about in-line comments,
# so only append them here
#
# Restart:
# The following setting will automatically restart RabbitMQ
# in the event of a failure. systemd service restarts are not a
# replacement for service monitoring. Please see
# https://www.rabbitmq.com/monitoring.html
Restart=on-failure
RestartSec=10
WorkingDirectory=/var/lib/rabbitmq
ExecStart=/usr/sbin/rabbitmq-server
ExecStop=/usr/sbin/rabbitmqctl shutdown
# See rabbitmq/rabbitmq-server-release#51
SuccessExitStatus=69
[Install]
WantedBy=multi-user.target
5、启动 RabbitMQ 服务,并设置为开机自动启动。
[centos@host ~]$ sudo systemctl daemon-reload
[centos@host ~]$ sudo systemctl start rabbitmq-server.service
[centos@host ~]$ sudo systemctl enable rabbitmq-server.service
6、使用 RabbitMQ 管理工具【rabbitmqctl】初始化管理员用户。【rabbitmqctl】指令只能有 root 和 rabbitmq 用户使用。
1)创建新用户:
[centos@host ~]$ sudo -u rabbitmq rabbitmqctl add_user admin password
Adding user "admin" ...
【add_user】表示创新以一个新用户,格式为: add_user <用户名> <用户口令>
2)设置用户角色:
[centos@host ~]$ sudo -u rabbitmq rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
【set_user_tags】表示设置用户的角色,格式为: set_user_tags <用户名> <角色名>。"administrator" 是内置的超级管理员。
内置角色有:
management :访问 management plugin;
policymaker :访问 management plugin 和管理自己 vhosts 的策略和参数;
monitoring :访问 management plugin 和查看所有配置和通道以及节点信息;
administrator :一切权限;
none :无配置。
3)设置用户权限:
[centos@host ~]$ sudo -u rabbitmq rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
Setting permissions for user "admin" in vhost "/" ...
【set_permissions】表示设置用户的访问权限,格式为: set_permissions [-p <vhost>] <user> <conf> <write> <read>。RabbitMQ 默认的 "<vhost>" 是 "/" ,"<user>" 表示用户名,"<conf>"、"<write>"、"<read>"的位置分别用正则表达式来匹配特定的资源,【'.*'】匹配所有资源,【'^$'】不匹配任何资源。
7、使用 RabbitMQ 管理工具【rabbitmq-plugins】启用 RabbitMQ 可视化管理插件。【rabbitmq-plugins】指令只能有 root 和 rabbitmq 用户使用。
[centos@host ~]$ sudo -u rabbitmq rabbitmq-plugins enable rabbitmq_management
8、设置防火墙端口(CentOS8默认安装firewall防火墙),允许 "15672"、"25672"、"5672" 、"4369" 端口(RabbitMQ 默认的 Http 管理插件、Erlang 订阅、AMQP、Epmd 端口)访问服务器。
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --reload
9、验证 RabbitMQ 可视化管理插件正常运行。使用浏览器客户端访问 RabbitMQ 可视化管理插件的 Http 服务接口进行测试,输入:http://<IP>:<PORT>
3.RabbitMQ 普通集群
在普通集群模式中,所有节点的 RabbitMQ 实例会同步 Virtual Host、Exchange、Queue、Banding、User 等内部元数据,但消息的实体数据只存储在生产者连接的节点上。
假设一个普通集群由 MQ01 、MQ02、MQ03 三个节点组成。生产者通过连接 MQ01 创建了一个消息,那这个消息的实体数据存储在 MQ01 节点上;当消费者通过连接 MQ02 或 MQ03 获取这个消息时,集群会根据各节点上一致的元数据信息,将请求从 MQ02 或 MQ03 调度到 MQ01 上,从 MQ01 获取消息。若 MQ01 故障,MQ02 和 MQ03 都无法获取消息。
3.1 集群部署拓扑图
网络资源规划:
节点名 | 主机名 | IP:PORT | 程序 | 操作系统 |
---|---|---|---|---|
主节点 | MQ-M | 192.168.0.20:15672 / 25672 / 5672 / 4369 | RabbitMQ | CentOS8 |
备节点≥1 | MQ-S1 | 192.168.0.21:15672 / 25672 / 5672 / 4369 | RabbitMQ | CentOS8 |
3.2 普通集群部署
1、参照章节 "2.RabbitMQ 单点安装和配置" 在主备节点上安装 RabbitMQ 中间件。
2、将主节点的 "/var/lib/rabbitmq/.erlang.cookie" 文件内容同步覆盖到各个备节点上,使集群中所有节点的 "erlang.cookie" 文件内容一致。
1)查看主节点的 "erlang.cookie" 文件内容:
[centos@MQ-M ~]$ sudo cat /var/lib/rabbitmq/.erlang.cookie
VFZZCQIEEVUVALAAUUGF
2)修改备节点的 "erlang.cookie" 文件内容:
使用文本编辑器打开配置文件:
[centos@MQ-S1 ~]$ sudo gedit /var/lib/rabbitmq/.erlang.cookie
修改文件内容与主节点的 "erlang.cookie" 文件内容一致并保存:
VFZZCQIEEVUVALAAUUGF
3、设置主备节点的本地 DNS 配置文件。为集群所有节点的 IP 和 主机名建立域名映射。
以集群 "主节点" 为例:
使用文本编辑器打开配置文件:
[centos@MQ-M ~]$ sudo gedit /etc/hosts
在文件中追加以下参数并保存:
192.168.0.20 MQ-M
192.168.0.21 MQ-S1
4、主备节点配置 RabbitMQ 服务开机自启动。单点安装时以配置开机启动服务,集群启动时需要修改服务文件的 "ExecStart" 参数。
以集群 "主节点" 为例:
使用文本编辑器打开"/usr/lib/systemd/system/rabbitmq-server.service"文件:
[centos@MQ-M ~]$ sudo gedit /usr/lib/systemd/system/rabbitmq-server.service
修改文件内容并保存:
[Unit]
Description=RabbitMQ broker
After=syslog.target network.target
[Service]
Type=notify
User=rabbitmq
Group=rabbitmq
UMask=0027
NotifyAccess=all
TimeoutStartSec=600
# To override LimitNOFILE, create the following file:
#
# /etc/systemd/system/rabbitmq-server.service.d/limits.conf
#
# with the following content:
#
# [Service]
# LimitNOFILE=65536
LimitNOFILE=32768
# Note: systemd on CentOS 7 complains about in-line comments,
# so only append them here
#
# Restart:
# The following setting will automatically restart RabbitMQ
# in the event of a failure. systemd service restarts are not a
# replacement for service monitoring. Please see
# https://www.rabbitmq.com/monitoring.html
Restart=on-failure
RestartSec=10
WorkingDirectory=/var/lib/rabbitmq
ExecStart=/usr/sbin/rabbitmq-server detached
ExecStop=/usr/sbin/rabbitmqctl shutdown
# See rabbitmq/rabbitmq-server-release#51
SuccessExitStatus=69
[Install]
WantedBy=multi-user.target
5、依序启动主备节点的 RabbitMQ 服务。
以集群 "主节点" 为例:
[centos@MQ-M ~]$ sudo systemctl daemon-reload
[centos@MQ-M ~]$ sudo systemctl restart rabbitmq-server.service
6、设置防火墙端口(CentOS8默认安装firewall防火墙),允许 "15672"、"25672"、"5672" 、"4369" 端口(RabbitMQ 默认的 Http 管理插件、Erlang 订阅、AMQP、Epmd 端口)访问服务器。
以集群 "主节点" 为例:
[centos@MQ-M ~]$ sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
[centos@MQ-M ~]$ sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
[centos@MQ-M ~]$ sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
[centos@MQ-M ~]$ sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
[centos@MQ-M ~]$ sudo firewall-cmd --reload
7、将备节点关联到主节点的组建集群。
备节点关联到集群中的方式有 "磁盘节点" 和 "内存节点" 两种方式,默认为 "磁盘节点"。"内存节点" 将所有的队列,交换器,绑定关系,用户,权限,和 vhost 的元数据信息保存在内存中。而 "磁盘节点" 将这些信息保存在磁盘中,但是内存节点的性能更高,为了保证集群的高可用性,必须保证集群中有两个以上的磁盘节点,来保证当有一个磁盘节点崩溃了,集群还能对外提供访问服务。
1)磁盘节点
[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl stop_app
Stopping rabbit application on node rabbit@MQ-S1 ...
[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl reset
Resetting node rabbit@MQ-S1 ...
[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl join_cluster rabbit@MQ-M
Clustering node rabbit@MQ-S1 rabbit@MQ-M
[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl start_app
Starting node rabbit@MQ-S1 ...
2)内存节点
[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl stop_app
Stopping rabbit application on node rabbit@MQ-S1 ...
[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl reset
Resetting node rabbit@MQ-S1 ...
[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl join_cluster rabbit@MQ-M --ram
Clustering node rabbit@MQ-S1 rabbit@MQ-M
[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl start_app
Starting node rabbit@MQ-S1 ...
8、依序查看主备节点的自身状态和集群状态。
以集群 "主节点" 为例:
[centos@MQ-M ~]$ sudo -u rabbitmq rabbitmqctl status
[centos@MQ-M ~]$ sudo -u rabbitmq rabbitmqctl cluster_status
9、通过 RabbitMQ 可视化管理插件验证集群。使用浏览器客户端访问 RabbitMQ 可视化管理插件的 Http 服务接口进行测试,输入:http://<IP>:<PORT>。
4.RabbitMQ 镜像集群
在镜像集群模式中,所有节点的 RabbitMQ 实例会同步内部元数据和消息的实体数据。
假设一个镜像集群由 MQ01 、MQ02、MQ03 三个节点组成。生产者通过连接 MQ01 创建了一个消息,那这个消息的实体数据会同时存储到 MQ01 、MQ02、MQ03 上,消费者通过连接 MQ02 或 MQ03 可以直接获取到这个消息。若 MQ01 故障,MQ02 和 MQ03 仍然可以获取消息,具有很强的可靠性,但是相对于普通集群会消耗掉更多的存储空间和带宽。比较适用于可靠性需求较高的业务场景。
4.1 集群部署拓扑图
网络资源规划:
节点名 | 主机名 | IP:PORT | 程序 | 操作系统 |
---|---|---|---|---|
主节点 | MQ-M | 192.168.0.20:15672 / 25672 / 5672 / 4369 | RabbitMQ | CentOS8 |
备节点≥1 | MQ-S1 | 192.168.0.21:15672 / 25672 / 5672 / 4369 | RabbitMQ | CentOS8 |
4.2 镜像集群部署
1、参照章节 "2.RabbitMQ 单点安装和配置" 在主备节点上安装 RabbitMQ 中间件。
2、参照章节 "3.RabbitMQ 普通集群" 部署 RabbitMQ 普通集群。
3、将普通集群升级为镜像集群。
RabbitMQ 通过在集群上设置策略实现匹配对象的消息同步,可以通过指令和可视化管理插件设置。
1)指令设置:
指令格式如下:
./rabbitmqctl set_policy <name> [-p <vhost>] <pattern> <definition> [--apply-to <apply-to>]
- name: 策略名称。
- vhost: 指定 vhost , 默认值【 / 】。
- pattern: 匹配对象的正则表达式,【 ^ 】代表全部匹配。
- definition: 匹配模式,通过 JSON 字符串设置,如: '{"ha-mode":"all"}'。其中:
① "ha-mode" 指明镜像队列的模式,有效值为 "all/exactly/nodes"。"all" 表示在集群所有的节点上进行镜像,无需设置 "ha-params";"exactly" 表示在指定个数的节点上进行镜像,节点的个数由 "ha-params"指定; " nodes" 表示在指定的节点上进行镜像,节点名称通过 "ha-params" 指定;
② "ha-params" 指明 "ha-mode" 模式需要用到的参数;
③ "ha-sync-mode"指明镜像队列中消息的同步方式,有效值为 "automatic/manually"。 - apply-to: 匹配的对象,默认【 all 】代表全部匹配。其中:
① "exchanges" 表示匹配交换器;
② "queues" 表示匹配队列;
③ "all" 表示同时匹配交换器和队列。
设置集群中所有的对象为镜像模式,在集群 "主节点" (或者任意已经加入集群的节点)上执行以下指令:
[centos@MQ-M ~]$ sudo -u rabbitmq rabbitmqctl set_policy replall -p / '^' '{"ha-mode":"all"}' --apply-to all
Setting policy "replall" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...
2)可视化插件设置:
5.RabbitMQ 镜像集群+ Keepalived 高可用
5.1 集群部署拓扑图
网络资源规划:
1、RabbitMQ 集群:
节点名 | 主机名 | IP:PORT | 程序 | 操作系统 |
---|---|---|---|---|
主节点 | MQ-M | 192.168.0.20:15672 / 25672 / 5672 / 4369 | RabbitMQ | CentOS8 |
备节点≥1 | MQ-S1 | 192.168.0.21:15672 / 25672 / 5672 / 4369 | RabbitMQ | CentOS8 |
2、Keepalived 高可用虚拟 IP 是【192.168.0.30】。
5.2 高可用镜像集群部署
1、参照章节 "2.RabbitMQ 单点安装和配置" 在主备节点上安装 RabbitMQ 中间件。
2、参照章节 "3.RabbitMQ 普通集群" 部署 RabbitMQ 普通集群。
3、参照章节 "3.RabbitMQ 镜像集群" 将普通集群升级为镜像集群。
在各个集群节点(MQ-M 、MQ-S1 )上安装、配置 Keepalived。以集群 "主节点" 为例:
4、安装 EPEL 的 Yum 源。
使用文本编辑器创建仓库配置文件:
[centos@MQ-M ~ ]$ sudo gedit /etc/yum.repos.d/epel.repo
在文件中编写以下内容并保存:
[epel-modular]
name=Extra Packages for Enterprise Linux Modular $releasever - $basearch
baseurl=http://mirrors.aliyun.com/epel/$releasever/Modular/$basearch
enabled=1
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/epel/RPM-GPG-KEY-EPEL-8
[epel]
name=Extra Packages for Enterprise Linux $releasever - $basearch
baseurl=http://mirrors.aliyun.com/epel/$releasever/Everything/$basearch
enabled=1
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/epel/RPM-GPG-KEY-EPEL-8
更新 Yum 源:
[centos@Proxy-1 ~]$ sudo dnf clean all
[centos@Proxy-1 ~]$ sudo dnf makecache
Extra Packages for Enterprise Linux Modular 8 - 429 kB/s | 118 kB 00:00
Extra Packages for Enterprise Linux 8 - x86_64 3.7 MB/s | 6.9 MB 00:01
元数据缓存已建立。
EPEL(Extra Packages for Enterprise Linux)是企业级 Linux 操作系统的扩展包仓库,为 Redhat/CentOS系统提供大量的额外软件包。
5、安装 Keepalived。
[centos@MQ-M ~]$ sudo dnf install keepalived
程序安装目录是"/usr/sbin",配置文件目录是"/etc/keepalived"。
6、设置 Keepalived 配置文件参数。
使用文本编辑器打开配置文件:
[centos@MQ-M ~ ]$ sudo gedit /etc/keepalived/keepalived.conf
在文件中编写以下内容并保存:
# 定义全局配置
global_defs {
# 本地节点 ID 标识,一般设置为主机名。
router_id MQ-M
}
# 定义周期性执行的脚本,脚本的退出状态码会被调用它的所有的 vrrp_instance 记录。
vrrp_script chk_rabbitmq {
# 执行脚本的路径。
script "/etc/keepalived/rabbitmq_check.sh"
# 脚本执行的间隔(单位是秒)。默认为1s。
interval 2
# 当脚本调整优先级,从 -254 到 254。默认为2。
# 1. 如果脚本执行成功(退出状态码为0),weight大于0,则priority增加。
# 2. 如果脚本执行失败(退出状态码为非0),weight小于0,则priority减少。
# 3. 其他情况下,priority不变。
weight -20
# 当脚本执行超过时长(单位是秒)则被认为执行失败。
# 运行脚本的用户和组。
user root root
# timeout 30
# 当脚本执行成功到设定次数时,才认为是成功。
# rise 1
# 当脚本执行失败到设定次数时,才认为是失败。
# fall 3
}
# 定义虚拟路由,可以定义多个。
vrrp_instance VI_1 {
# 本地节点初始状态,包括 MASTER(主节点) 和 BACKUP (备节点)。
state MASTER
# 本地节点绑定虚拟 IP 的网络接口。
interface ens33
# 本地节点优先级,优先级高的节点将动态变成 MASTER 节点,接管 VIP 。初始状态下,MASTER 节点的优先级必须高于 BACKUP 节点。
priority 100
# VRRP 实例 ID,范围是0-255。同一集群的所有节点应设置一致的值。
virtual_router_id 216
# 组播信息发送时间间隔。同一集群的所有节点必须设置一样,默认为1秒。
advert_int 1
# 设置验证信息。同一集群的所有节点必须一致
authentication {
# 指定认证方式。PASS 表示简单密码认证(推荐);AH:IPSEC认证(不推荐)。
auth_type PASS
# 指定认证所使用的密码,最多8位。
auth_pass 1111
}
# 声明调用已定义的 vrrp_script 脚本。
track_script {
chk_rabbitmq
}
# 定义虚拟 IP 地址。
virtual_ipaddress {
192.168.0.30
}
}
# 定义对外提供服务 LVS (负载均衡)的 VIP 和 端口(当端口号设置为【0】时,表示所有端口),只实现高可用时可不配置。
virtual_server 192.168.0.30 5672 {
# 设置健康检查时间,单位是秒
delay_loop 6
#负载均衡调度算法
lb_algo rr
# 设置LVS实现负载的机制,有NAT、TUN、DR三个模式
lb_kind DR
# VIP 子网掩码
nat_mask 255.255.255.0
# 会话保持时间,一定时间之内用户无响应则下一次用户请求时需重新路由,一般设为0,表示不需要
persistence_timeout 0
# 网络协议
protocol TCP
# 定义后端 RealServer 的真实服务器属性,IP 地址和端口(当端口号设置为【0】时,表示所有端口)
real_server 192.168.0.20 5672 {
# 配置节点权值,数字越大权重越高
weight 1
TCP_CHECK {
connect_timeout 10
nb_get_retry 3
delay_before_retry 3
connect_port 80
}
}
real_server 192.168.0.21 5672 {
weight 1
TCP_CHECK {
connect_timeout 10
nb_get_retry 3
delay_before_retry 3
connect_port 80
}
}
}
初始化的主节点和备节点的区别体现在以下参数中:
- 初始主节点
vrrp_instance VI_1 {
# 必须设置为 MASTER 。
state MASTER
# 必须设置为最大值。
priority 100
}
- 初始备节点
vrrp_instance VI_1 {
# 必须设置为 BACKUP 。
state BACKUP
# 必须设置为小于主节点的值。
priority 90
}
7、创建或编辑 RabbitMQ 检测脚本文件。文件路径对应配置文件中 vrrp_script 的 script 设置值。
使用文本编辑器创建脚本文件:
[centos@MQ-M ~ ]$ sudo gedit /etc/keepalived/rabbitmq_check.sh
在脚本文件中编写以下内容并保存:
#!/bin/bash
counter=$(/usr/sbin/rabbitmqctl node_health_check | grep 'Health check passed' | wc -l)
if [ "${counter}" = "0" ]; then
sudo -u rabbitmq /usr/sbin/rabbitmq-server detached
sleep 5
counter=$(/usr/sbin/rabbitmqctl node_health_check | grep 'Health check passed' | wc -l)
if [ "${counter}" = "0" ]; then
killall -9 keepalived
fi
fi
给脚本文件增加可执行权限:
[centos@MQ-M ~ ]$ sudo chmod 755 /etc/keepalived/rabbitmq_check.sh
8、配置 Keepalived 系统服务。
使用文本编辑器创建配置文件:
[centos@MQ-M ~ ]$ sudo gedit /usr/lib/systemd/system/keepalived.service
验证或修改文件内容并保存如下:
[Unit]
Description=LVS and VRRP High Availability Monitor
After=network-online.target syslog.target rabbitmq-server.service
Wants=network-online.target
Requires=rabbitmq-server.service
[Service]
Type=forking
User=root
Group=root
PIDFile=/var/run/keepalived.pid
KillMode=process
EnvironmentFile=-/etc/sysconfig/keepalived
ExecStart=/usr/sbin/keepalived $KEEPALIVED_OPTIONS
ExecReload=/bin/kill -HUP $MAINPID
ExecStop=/bin/kill -HUP $MAINPID
[Install]
WantedBy=multi-user.target
重新加载系统服务管理器:
[centos@MQ-M ~ ]$ sudo systemctl daemon-reload
9、设置防火墙端口(CentOS8默认安装firewall防火墙),允许"112"端口(Keepalived 默认端口)访问服务器。
[centos@MQ-M ~ ]$ sudo firewall-cmd --zone=public --add-port=112/tcp --permanent
[centos@MQ-M ~ ]$ sudo firewall-cmd --reload
10、启动/重启 Keepalived 服务(不建议设置为开机自启动)。
启动 Keepalived 服务之前,应确保已正确启动了各节点的 RabbitMQ 服务。各节点的启动或重启的顺序为:① 启动 Keepalived 主节点;② 依次启动 Keepalived 备节点。
[centos@MQ-M ~ ]$ sudo systemctl restart keepalived.service
11、启动 Keepalived 可能因为各种未知的原因失败,主要是由于引发了 SELinux 异常。有关如何解决 SELinux 引起的异常,请阅读文章《RedHat/CentOS8【SELinux】引起的安全策略问题解决方案》,文章地址【//www.greatytc.com/p/a13f974f8bae】。
注意:其他 RabbitMQ 集群节点全部需要按照以上步骤配置。
12、使用浏览器通过虚拟 IP 访问 RabbitMQ 服务。
6.RabbitMQ 运维管理
RabbitMQ 提供可视化插件和命令管道两种运维管理途径。
6.1.通过可视化插件运维管理
1、启动 RabbitMQ 服务。
2、使用 RabbitMQ 管理工具【rabbitmqctl】初始化管理员用户。【rabbitmqctl】指令只能有 root 和 rabbitmq 用户使用。
1)创建新用户:
[centos@host ~]$ sudo -u rabbitmq rabbitmqctl add_user admin password
Adding user "admin" ...
【add_user】表示创新以一个新用户,格式为: add_user <用户名> <用户口令>
2)设置用户角色:
[centos@host ~]$ sudo -u rabbitmq rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
【set_user_tags】表示设置用户的角色,格式为: set_user_tags <用户名> <角色名>。"administrator" 是内置的超级管理员。
内置角色有:
management :访问 management plugin;
policymaker :访问 management plugin 和管理自己 vhosts 的策略和参数;
monitoring :访问 management plugin 和查看所有配置和通道以及节点信息;
administrator :一切权限;
none :无配置。
3)设置用户权限:
[centos@host ~]$ sudo -u rabbitmq rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
Setting permissions for user "admin" in vhost "/" ...
【set_permissions】表示设置用户的访问权限,格式为: set_permissions [-p <vhost>] <user> <conf> <write> <read>。RabbitMQ 默认的 "<vhost>" 是 "/" ,"<user>" 表示用户名,"<conf>"、"<write>"、"<read>"的位置分别用正则表达式来匹配特定的资源,【'.*'】匹配所有资源,【'^$'】不匹配任何资源。
3、使用 RabbitMQ 管理工具【rabbitmq-plugins】启用 RabbitMQ 可视化管理插件。【rabbitmq-plugins】指令只能有 root 和 rabbitmq 用户使用。
[centos@host ~]$ sudo -u rabbitmq rabbitmq-plugins enable rabbitmq_management
4、设置防火墙端口(CentOS8默认安装firewall防火墙),允许 "15672"、"25672"、"5672" 、"4369" 端口(RabbitMQ 默认的 Http 管理插件、Erlang 订阅、AMQP、Epmd 端口)访问服务器。
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --reload
5、验证 RabbitMQ 可视化管理插件正常运行。使用浏览器客户端访问 RabbitMQ 可视化管理插件的 Http 服务接口进行测试,输入:http://<IP>:<PORT>
6.2.通过命令管道运维管理
1、管理 RabbitMQ 服务:
rabbitmq-server start
# 启动服务
rabbitmq-server stop
# 停止服务
rabbitmq-server restart
# 重新启动服务
rabbitmq-server status
# 查看服务状态
rabbitmq-server detached
# 后台运行服务
正确安装 RabbitMQ 后,程序提供【rabbitmqctl】指令,可用于 RabbitMQ 服务的运维管理。
2、【rabbitmqctl】语法:
rabbitmqctl [-n <node>] [-q][-h] <command> [<command options>]
其中:
-n <node>:默认 node 名称是 "rabbit@hostname",如果你的主机明是 "MQ-M",那么 node 名称是 "rabbit@'MQ-M"。
-q:安静输出模式,信息会被禁止输出。
-h:查看帮助信息。
command:管理指令。
command options:管理指令的参数。
3、基本管理指令:
shutdown
# 关闭 RabbitMQ 服务。
stop [<pid_file>]
# 停止在 erlang node 上运行的 RabbitMQ。
stop_app
# 停止 erlang node 上的 RabbitMQ 的应用,但是 erlang node 还是会继续运行。
start_app
# 启动 erlang node 上的 RabbitMQ 的应用。
wait <pid_file>
# 等待 RabbitMQ 服务启动。
reset
# 初始化 erlang node 状态,会从集群中删除该节点,从管理数据库中删除所有数据,例如:vhosts 等。在初始化之前 RabbitMQ 的应用必须先停止。
force_reset
# 无条件的初始化 erlang node 状态。
status
# 查看 RabbitMQ 服务的状态。
rotate_logs <suffix>
# 轮转日志文件。
3、集群管理指令:
join_cluster <clusternode> [--ram]
# 加入到指定主节点的集群中。clusternode 表示 node 名称,--ram 表示 node 以 ram node(内存节点) 加入集群中。默认 node 以 disc node(磁盘节点) 加入集群,在一个 node 加入 cluster 之前,必须先停止该 node 的 RabbitMQ 应用,即先执行 stop_app。
cluster_status
# 显示 cluster 中的所有 node 的状态。
change_cluster_node_type disc | ram
# 改变一个 cluster 中 node 的模式,该节点在转换前必须先停止,不能把一个集群中唯一的 disk node 转化为 ram node。
forget_cluster_node [--offline]
# 远程移除 cluster 中的一个 node,前提是该 node 必须处于 offline 状态,如果是 online 状态,则需要加--offline 参数。
sync_queue queue
# 同步镜像队列。
cancel_sync_queue queue
# 取消同步镜像队列。
4、用户管理指令:
add_user <username> <password>
# 在 RabbitMQ 的内部数据库添加用户。
delete_user <username>
# 删除一个用户。
change_password <username> <newpassword>
# 改变用户密码。
clear_password <username>
# 清除用户密码,禁止用户登录。
set_user_tags <username> <tag> ...
# 设置用户 tags(内置角色)。内置角色有:
# > management :访问 management plugin;
# > policymaker :访问 management plugin 和管理自己 vhosts 的策略和参数;
# > monitoring :访问 management plugin 和查看所有配置和通道以及节点信息;
# > administrator :一切权限;
# > none :无配置。
list_users
# 列出用户。
add_vhost <vhostpath>
# 创建一个 vhosts 。
delete_vhost <vhostpath>
# 删除一个 vhosts 。
list_vhosts [<vhostinfoitem> ...]
# 列出 vhosts 。
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
# 针对一个 vhosts 给用户赋予相关权限。
clear_permissions [-p <vhostpath>] <username>
# 清除一个用户对 vhosts 的权限。
list_permissions [-p <vhostpath>]
# 列出哪些用户可以访问该 vhosts。
list_user_permissions <username>
# 列出该用户的访问权限。
5、策略管理指令:
set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>]
# > name: 策略名称。
# > vhost: 指定 vhost , 默认值【 / 】。
# > pattern: 匹配对象的正则表达式,【 ^ 】代表全部匹配。
# > definition: 匹配模式,通过 JSON 字符串设置,如: '{"ha-mode":"all"}'。其中:
# "ha-mode" 指明镜像队列的模式,有效值为 "all/exactly/nodes"。"all" 表示在集群所有的节点上进行镜像,无需设置 "ha-params";"exactly" 表示在指定个数的节点上进行镜像,节点的个数由 "ha-params"指定; " nodes" 表示在指定的节点上进行镜像,节点名称通过 "ha-params" 指定;
# "ha-params" 指明 "ha-mode" 模式需要用到的参数;
# "ha-sync-mode"指明镜像队列中消息的同步方式,有效值为 "automatic/manually"。
# > apply-to: 匹配的对象,默认【 all 】代表全部匹配。其中:
# "exchanges" 表示匹配交换器;
# "queues" 表示匹配队列;
# "all" 表示同时匹配交换器和队列。
clear_policy [-p <vhostpath>] <name>
# 清除一个策略。
list_policies [-p <vhostpath>]
# 列出已有的策略。
6、插件管理指令:
rabbitmq-plugins <command> [<command options>]
# Commands:
# list [-v] [-m] [-E] [-e] [<pattern>] 显示所有的的插件。-v 显示版本 -m 显示名称 -E 显示明确已经开启的 -e 显示明确的和暗中开启的
# enable <plugin> ... 开启一个插件
# disable <plugin> ... 关闭一个插件
如:
rabbitmq-plugins enable rabbitmq_management
# 开启可视化插件。
7、其他管理指令:
list_queues [-p <vhostpath>] [<queueinfoitem> ...]
# 返回 queue 的信息,如果省略了-p参数,则默认显示的是【 / 】的 vhosts 信息。
list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
#返回 exchange 的信息。
list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
# 返回绑定信息。
list_connections [<connectioninfoitem> ...]
# 返回链接信息。
list_channels [<channelinfoitem> ...]
# 返回目前所有的 channels 。
list_consumers [-p <vhostpath>]
# 返回 consumers。
status
# 显示 broker 的状态。
environment
# 显示环境参数的信息。
report
# 返回一个服务状态 report 。
7.Java 开发集成示例
amqp-client 是 RabbitMQ 的 JavaAPI 包,以下是 Maven 项目的程序案例。
1、从 Maven 库中引入 amqp-client 包。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
2、编写 Producter(生产者)端示例程序。本示例实现【每隔 1 秒钟无线循环向队列提交一个 UUID 消息】。
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 生产者提交消息。
*
*/
public class Producter {
public static void main(String[] args) {
// RabbitMQ 服务地址
String mqIp = "192.168.0.30";
// RabbitMQ 服务端口
int mqPort = 5672;
// RabbitMQ 登录账号
String mqUser = "admin";
// RabbitMQ 登录账号口令
String mqPass = "123456a?";
// 定义虚拟主机
String vhost = "/";
// 定义交换器(如果 RabbitMQ 上没有这个 exchange 会自动创建)
String exchange = "test_exchange";
// 定义队列(如果 RabbitMQ 上没有这个 queue 会自动创建)
String queue = "test_queue";
// 定义绑定KEY(如果 RabbitMQ 上没有这个 bandingkey 会自动创建)
String bandingkey = "test_bandingkey";
// RabbitMQ Tcp 连接工厂实例。
ConnectionFactory factory = null;
// RabbitMQ Tcp 连接实例。
Connection connection = null;
// RabbitMQ 信道实例。
Channel channel = null;
try {
// 创建 RabbitMQ 连接工厂
factory = new ConnectionFactory();
factory.setHost(mqIp);
factory.setPort(mqPort);
factory.setUsername(mqUser);
factory.setPassword(mqPass);
factory.setVirtualHost(vhost);
// 创建 RabbitMQ Tcp 连接实例
connection = factory.newConnection();
// 创建 RabbitMQ 频道
channel = connection.createChannel();
// 声明交换机类型
channel.exchangeDeclare(exchange, "fanout", true);
// 声明队列
channel.queueDeclare(queue, true, false, true, null);
// 也可以获取默认队列
// String queue = channel.queueDeclare().getQueue();
// 将队列与交换机绑定
channel.queueBind(queue, exchange, bandingkey);
// 将数据提交到队列,本例中每隔1秒钟无线循环写入一个 UUID 消息。
while (true) {
String uuid = UUID.randomUUID().toString();
System.out.println("提交消息:" + uuid);
channel.basicPublish(exchange, bandingkey, null, uuid.getBytes());
Thread.sleep(1000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试运行结果:
2、编写 Consumer(消费者)端示例程序。本示例实现【从队列中获取已提交的 UUID 消息】。
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 消费者获取消息。
*
*/
public class Consumer {
public static void main(String[] args) {
// RabbitMQ 服务地址
String mqIp = "192.168.0.30";
// RabbitMQ 服务端口
int mqPort = 5672;
// RabbitMQ 登录账号
String mqUser = "admin";
// RabbitMQ 登录账号口令
String mqPass = "123456a?";
// 定义虚拟主机
String vhost = "/";
// 定义交换器(如果 RabbitMQ 上没有这个 exchange 会自动创建)
String exchange = "test_exchange";
// 定义队列(如果 RabbitMQ 上没有这个 queue 会自动创建)
String queue = "test_queue";
// 定义绑定KEY(如果 RabbitMQ 上没有这个 bandingkey 会自动创建)
String bandingkey = "test_bandingkey";
// RabbitMQ Tcp 连接工厂实例。
ConnectionFactory factory = null;
// RabbitMQ Tcp 连接实例。
Connection connection = null;
// RabbitMQ 信道实例。
Channel channel = null;
try {
// 创建 RabbitMQ 连接工厂
factory = new ConnectionFactory();
factory.setHost(mqIp);
factory.setPort(mqPort);
factory.setUsername(mqUser);
factory.setPassword(mqPass);
factory.setVirtualHost(vhost);
// 创建 RabbitMQ Tcp 连接实例
connection = factory.newConnection();
// 创建 RabbitMQ 频道
channel = connection.createChannel();
// 声明交换机类型
channel.exchangeDeclare(exchange, "fanout", true);
// 声明队列
channel.queueDeclare(queue, true, false, true, null);
// 也可以获取默认队列
// String queue = channel.queueDeclare().getQueue();
// 将队列与交换机绑定
channel.queueBind(queue, exchange, bandingkey);
// 从队列获取数据,本例中每隔1秒钟无线循环写入一个 UUID 消息。
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("获取消息:" + message);
}
};
// channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
channel.basicConsume(queue, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
测试运行结果: