应用架构之【RabbitMQ+Keepalived】AMQP 消息队列集群方案

RabbitMQ 是一个开源(遵循 MPL 协议)的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ 是使用 Erlang 语言编写,并且基于 AMQP 协议实现。

AMQP (Advanced Message Queuing Protocol)高级消息队列协议。AMQP是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。AMQP 包括 Exchange 和 Binging 的角色,生产者(Producer)把消息发布到 Exchange 上,消息最终到达队列并被消费者(Consumer)接收,而 Binding 决定交换器的消息应该发送到哪个队列。

Keepalived 是一款基于 VRRP 协议的轻量级服务高可用和负载均衡方案,提供避免服务器单点故障和请求分流的能力。

本方案基于CentOS8系统设计,建议在RedHat/CentOS系统中使用。部署数据库集群使用服务器及网络资源较多,建议在实施前做好规划工作,有利于部署工作顺利、有序进行。

目录

1.RabbitMQ 技术评估

2.RabbitMQ 单点安装和配置

3.RabbitMQ 普通集群
-- 3.1 集群部署拓扑图
-- 3.2 普通集群部署

4.RabbitMQ 镜像集群
-- 4.1 集群部署拓扑图
-- 4.2 镜像集群部署

5.RabbitMQ 镜像集群+ Keepalived 高可用
-- 5.1 集群部署拓扑图
-- 5.2 高可用镜像集群部署

6.RabbitMQ 运维管理
-- 6.1.通过可视化插件运维管理
-- 6.2.通过命令管道运维管理

7.Java 开发集成示例


1. RabbitMQ 技术评估

1、技术特点

  • 可靠性:使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。

  • 灵活的路由:在消息进入队列之前,通过 Exchange 来路由消息。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange。

  • 消息集群:多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker。

  • 高可用:队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  • 多种协议:支持多种消息队列协议,如 STOMP、MQTT 等。

  • 多种语言客户端:几乎支持所有常用语言,比如 Java、.NET、Ruby 等。

  • 管理界面:提供了易用的用户界面,用户可以监控和管理消息 Broker 的许多方面。

  • 跟踪机制:如果消息异常,RabbitMQ 提供了消息的跟踪机制。

  • 插件机制:提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。

2、组件结构

RabbitMQ 组件架构图
  • Broker:标识消息队列服务器实体。

  • Virtual Host:虚拟主机,标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个的 RabbitMQ 服务器的实例,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是【/】。

  • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  • Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟链接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  • Connection:网络连接,比如一个TCP连接。

  • Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  • Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

  • Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括 【routing-key(路由键)】、【priority(优先级)】、【delivery-mode(路由模式)】等。

3、Exchange (交换器)的类型

Exchange 分发消息时,根据类型的不同分发策略有区别。常用类型包括 direct、fanout、topic。

1)direct:

Message 中的 "路由键(routing-key)" 如果和 Banding 中的 "绑定键(binding-key)" 一致,direct 类型交换器就将消息发到 Banding 对应的 Queue 中。如下图:

RabbitMQ direct exchange

2)fanout:

Message 都会分到所有 Banding 对应的 Queue 中,就像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型的交换器转发消息是最快的。

RabbitMQ fanout exchange

3)topic:

交换器通过模式匹配将 Message 中的 "路由键(routing-key)" 如果和 Banding 中的 "绑定键(binding-key)" 字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:【 # 】和 【 * 】。【 # 】匹配 0 个或多个单词,【 * 】匹配一个单词。topic 类型交换器就将消息发到 Banding 对应的 Queue 中。

4、消息处理流程

RabbitMQ 消息处理流程

5、应用场景

1)异步处理

场景:用户注册后,需要发注册邮件和注册短信。

传统的做法有两种:

  • 串行的方式
    将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是邮件、短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。


    串行方式
  • 并行的方式
    将注册信息写入数据库后,发送邮件的同时发送短信,以上三个任务完成后返回给客户端,并行的方式能提高处理的时间。

并行方式

假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回。

消息队列方式:

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理。


image

由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),

引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

2)应用解耦

场景:用户下单后,订单系统需要通知库存系统。

传统的做法就是订单系统调用库存系统的接口:


image

这种做法有一个缺点:当库存系统出现故障时,订单就会失败,订单系统和库存系统高耦合。

消息队列方式:

image

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息进行库操作。
就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

3)流量削峰

场景: 因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:

  • 可以控制活动人数,超过此一定阀值的订单直接丢弃。
  • 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)。


    image

用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。业务根据消息队列中的请求信息,再做后续处理。


2.RabbitMQ 单点安装和配置

1、安装 EPEL 的 Yum 源。RabbitMQ 依赖于 EPEL 中的 Erlang 平台运行。

使用文本编辑器创建仓库配置文件:

[centos@host ~ ]$ sudo gedit /etc/yum.repos.d/epel.repo

在文件中编写以下内容并保存:

[epel-modular]
name=Extra Packages for Enterprise Linux Modular $releasever - $basearch
baseurl=http://mirrors.aliyun.com/epel/$releasever/Modular/$basearch
enabled=1
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/epel/RPM-GPG-KEY-EPEL-8

[epel]
name=Extra Packages for Enterprise Linux $releasever - $basearch
baseurl=http://mirrors.aliyun.com/epel/$releasever/Everything/$basearch
enabled=1
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/epel/RPM-GPG-KEY-EPEL-8

更新 Yum 源:

[centos@host ~]$ sudo dnf clean all
[centos@host ~]$ sudo dnf makecache
Extra Packages for Enterprise Linux Modular 8 - 429 kB/s | 118 kB     00:00    
Extra Packages for Enterprise Linux 8 - x86_64  3.7 MB/s | 6.9 MB     00:01    
元数据缓存已建立。

EPEL(Extra Packages for Enterprise Linux)是企业级 Linux 操作系统的扩展包仓库,为 Redhat/CentOS系统提供大量的额外软件包。

2、安装 RabbitMQ 的 Yum 源。

使用文本编辑器创建仓库配置文件:

[centos@host ~]$ sudo gedit /etc/yum.repos.d/rabbitmq.repo

在文件中编写以下内容并保存:

[rabbitmq-server]
name=RabbitMQ Server for Redhat Linux $releasever
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/$releasever/
gpgcheck=0
repo_gpgcheck=0
enabled=1
gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc

更新 Yum 源:

[centos@host ~]$ sudo dnf clean all
[centos@host ~]$ sudo dnf makecache
RabbitMQ Server                                 1.4 kB/s |  12 kB     00:09    
元数据缓存已建立。

3、安装 RabbitMQ Server。

[centos@host ~]$ sudo dnf install rabbitmq-server

程序指令目录是"/usr/sbin";
程序安装目录是"/usr/lib/rabbitmq";
服务器程序是"/usr/bin/rabbitmq-server";
服务器控制程序是"/usr/bin/rabbitmqctl";
程序运行用户和组是"rabbitmq:rabbitmq","rabbitmq"用户和组安装时默认创建。

4、配置 RabbitMQ 服务开机自启动。

使用文本编辑器打开"/usr/lib/systemd/system/rabbitmq-server.service"文件:

[centos@host ~]$ sudo gedit /usr/lib/systemd/system/rabbitmq-server.service

验证或修改文件内容并保存:

[Unit]
Description=RabbitMQ broker
After=syslog.target network.target

[Service]
Type=notify
User=rabbitmq
Group=rabbitmq
UMask=0027
NotifyAccess=all
TimeoutStartSec=600

# To override LimitNOFILE, create the following file:
#
# /etc/systemd/system/rabbitmq-server.service.d/limits.conf
#
# with the following content:
#
# [Service]
# LimitNOFILE=65536

LimitNOFILE=32768

# Note: systemd on CentOS 7 complains about in-line comments,
# so only append them here
#
# Restart:
# The following setting will automatically restart RabbitMQ
# in the event of a failure. systemd service restarts are not a
# replacement for service monitoring. Please see
# https://www.rabbitmq.com/monitoring.html
Restart=on-failure
RestartSec=10
WorkingDirectory=/var/lib/rabbitmq
ExecStart=/usr/sbin/rabbitmq-server
ExecStop=/usr/sbin/rabbitmqctl shutdown
# See rabbitmq/rabbitmq-server-release#51
SuccessExitStatus=69

[Install]
WantedBy=multi-user.target

5、启动 RabbitMQ 服务,并设置为开机自动启动。

[centos@host ~]$ sudo systemctl daemon-reload
[centos@host ~]$ sudo systemctl start rabbitmq-server.service
[centos@host ~]$ sudo systemctl enable rabbitmq-server.service

6、使用 RabbitMQ 管理工具【rabbitmqctl】初始化管理员用户。【rabbitmqctl】指令只能有 root 和 rabbitmq 用户使用。

1)创建新用户:

[centos@host ~]$ sudo -u rabbitmq rabbitmqctl add_user admin password
Adding user "admin" ...

【add_user】表示创新以一个新用户,格式为: add_user <用户名> <用户口令>

2)设置用户角色:

[centos@host ~]$ sudo -u rabbitmq rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

【set_user_tags】表示设置用户的角色,格式为: set_user_tags <用户名> <角色名>。"administrator" 是内置的超级管理员。
内置角色有:
management :访问 management plugin;
policymaker :访问 management plugin 和管理自己 vhosts 的策略和参数;
monitoring :访问 management plugin 和查看所有配置和通道以及节点信息;
administrator :一切权限;
none :无配置。

3)设置用户权限:

[centos@host ~]$ sudo -u rabbitmq rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
Setting permissions for user "admin" in vhost "/" ...

【set_permissions】表示设置用户的访问权限,格式为: set_permissions [-p <vhost>] <user> <conf> <write> <read>。RabbitMQ 默认的 "<vhost>" 是 "/" ,"<user>" 表示用户名,"<conf>"、"<write>"、"<read>"的位置分别用正则表达式来匹配特定的资源,【'.*'】匹配所有资源,【'^$'】不匹配任何资源。

7、使用 RabbitMQ 管理工具【rabbitmq-plugins】启用 RabbitMQ 可视化管理插件。【rabbitmq-plugins】指令只能有 root 和 rabbitmq 用户使用。

[centos@host ~]$ sudo -u rabbitmq rabbitmq-plugins enable rabbitmq_management

8、设置防火墙端口(CentOS8默认安装firewall防火墙),允许 "15672"、"25672"、"5672" 、"4369" 端口(RabbitMQ 默认的 Http 管理插件、Erlang 订阅、AMQP、Epmd 端口)访问服务器。

[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --reload

9、验证 RabbitMQ 可视化管理插件正常运行。使用浏览器客户端访问 RabbitMQ 可视化管理插件的 Http 服务接口进行测试,输入:http://<IP>:<PORT>

RabbitMQ 可视化管理插件-用户认证
RabbitMQ 可视化管理插件-主界面

3.RabbitMQ 普通集群

普通集群模式中,所有节点的 RabbitMQ 实例会同步 Virtual Host、Exchange、Queue、Banding、User 等内部元数据,但消息的实体数据只存储在生产者连接的节点上。

假设一个普通集群由 MQ01 、MQ02、MQ03 三个节点组成。生产者通过连接 MQ01 创建了一个消息,那这个消息的实体数据存储在 MQ01 节点上;当消费者通过连接 MQ02 或 MQ03 获取这个消息时,集群会根据各节点上一致的元数据信息,将请求从 MQ02 或 MQ03 调度到 MQ01 上,从 MQ01 获取消息。若 MQ01 故障,MQ02 和 MQ03 都无法获取消息。

3.1 集群部署拓扑图

普通集群拓扑图

网络资源规划:

节点名 主机名 IP:PORT 程序 操作系统
主节点 MQ-M 192.168.0.20:15672 / 25672 / 5672 / 4369 RabbitMQ CentOS8
备节点≥1 MQ-S1 192.168.0.21:15672 / 25672 / 5672 / 4369 RabbitMQ CentOS8

3.2 普通集群部署

1、参照章节 "2.RabbitMQ 单点安装和配置" 在主备节点上安装 RabbitMQ 中间件。

2、将主节点的 "/var/lib/rabbitmq/.erlang.cookie" 文件内容同步覆盖到各个备节点上,使集群中所有节点的 "erlang.cookie" 文件内容一致。

1)查看主节点的 "erlang.cookie" 文件内容:

[centos@MQ-M ~]$ sudo cat /var/lib/rabbitmq/.erlang.cookie
VFZZCQIEEVUVALAAUUGF

2)修改备节点的 "erlang.cookie" 文件内容:

使用文本编辑器打开配置文件:

[centos@MQ-S1 ~]$ sudo gedit /var/lib/rabbitmq/.erlang.cookie

修改文件内容与主节点的 "erlang.cookie" 文件内容一致并保存:

VFZZCQIEEVUVALAAUUGF

3、设置主备节点的本地 DNS 配置文件。为集群所有节点的 IP 和 主机名建立域名映射。

以集群 "主节点" 为例:

使用文本编辑器打开配置文件:

[centos@MQ-M ~]$ sudo gedit /etc/hosts

在文件中追加以下参数并保存:

192.168.0.20    MQ-M
192.168.0.21    MQ-S1

4、主备节点配置 RabbitMQ 服务开机自启动。单点安装时以配置开机启动服务,集群启动时需要修改服务文件的 "ExecStart" 参数。

以集群 "主节点" 为例:

使用文本编辑器打开"/usr/lib/systemd/system/rabbitmq-server.service"文件:

[centos@MQ-M ~]$ sudo gedit /usr/lib/systemd/system/rabbitmq-server.service

修改文件内容并保存:

[Unit]
Description=RabbitMQ broker
After=syslog.target network.target

[Service]
Type=notify
User=rabbitmq
Group=rabbitmq
UMask=0027
NotifyAccess=all
TimeoutStartSec=600

# To override LimitNOFILE, create the following file:
#
# /etc/systemd/system/rabbitmq-server.service.d/limits.conf
#
# with the following content:
#
# [Service]
# LimitNOFILE=65536

LimitNOFILE=32768

# Note: systemd on CentOS 7 complains about in-line comments,
# so only append them here
#
# Restart:
# The following setting will automatically restart RabbitMQ
# in the event of a failure. systemd service restarts are not a
# replacement for service monitoring. Please see
# https://www.rabbitmq.com/monitoring.html
Restart=on-failure
RestartSec=10
WorkingDirectory=/var/lib/rabbitmq
ExecStart=/usr/sbin/rabbitmq-server detached
ExecStop=/usr/sbin/rabbitmqctl shutdown
# See rabbitmq/rabbitmq-server-release#51
SuccessExitStatus=69

[Install]
WantedBy=multi-user.target

5、依序启动主备节点的 RabbitMQ 服务。

以集群 "主节点" 为例:

[centos@MQ-M ~]$ sudo systemctl daemon-reload
[centos@MQ-M ~]$ sudo systemctl restart rabbitmq-server.service

6、设置防火墙端口(CentOS8默认安装firewall防火墙),允许 "15672"、"25672"、"5672" 、"4369" 端口(RabbitMQ 默认的 Http 管理插件、Erlang 订阅、AMQP、Epmd 端口)访问服务器。

以集群 "主节点" 为例:

[centos@MQ-M ~]$ sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
[centos@MQ-M ~]$ sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
[centos@MQ-M ~]$ sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
[centos@MQ-M ~]$ sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
[centos@MQ-M ~]$ sudo firewall-cmd --reload

7、将备节点关联到主节点的组建集群。

备节点关联到集群中的方式有 "磁盘节点" 和 "内存节点" 两种方式,默认为 "磁盘节点"。"内存节点" 将所有的队列,交换器,绑定关系,用户,权限,和 vhost 的元数据信息保存在内存中。而 "磁盘节点" 将这些信息保存在磁盘中,但是内存节点的性能更高,为了保证集群的高可用性,必须保证集群中有两个以上的磁盘节点,来保证当有一个磁盘节点崩溃了,集群还能对外提供访问服务。

1)磁盘节点

[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl stop_app 
Stopping rabbit application on node rabbit@MQ-S1 ...

[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl reset
Resetting node rabbit@MQ-S1 ...

[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl join_cluster rabbit@MQ-M
Clustering node rabbit@MQ-S1 rabbit@MQ-M

[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl start_app 
Starting node rabbit@MQ-S1 ...

2)内存节点

[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl stop_app 
Stopping rabbit application on node rabbit@MQ-S1 ...

[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl reset
Resetting node rabbit@MQ-S1 ...

[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl join_cluster rabbit@MQ-M --ram
Clustering node rabbit@MQ-S1 rabbit@MQ-M

[centos@MQ-S1 ~]$ sudo -u rabbitmq rabbitmqctl start_app
Starting node rabbit@MQ-S1 ...

8、依序查看主备节点的自身状态和集群状态。

以集群 "主节点" 为例:

[centos@MQ-M ~]$ sudo -u rabbitmq rabbitmqctl status
[centos@MQ-M ~]$ sudo -u rabbitmq rabbitmqctl cluster_status

9、通过 RabbitMQ 可视化管理插件验证集群。使用浏览器客户端访问 RabbitMQ 可视化管理插件的 Http 服务接口进行测试,输入:http://<IP>:<PORT>

集群状态

4.RabbitMQ 镜像集群

镜像集群模式中,所有节点的 RabbitMQ 实例会同步内部元数据消息的实体数据

假设一个镜像集群由 MQ01 、MQ02、MQ03 三个节点组成。生产者通过连接 MQ01 创建了一个消息,那这个消息的实体数据会同时存储到 MQ01 、MQ02、MQ03 上,消费者通过连接 MQ02 或 MQ03 可以直接获取到这个消息。若 MQ01 故障,MQ02 和 MQ03 仍然可以获取消息,具有很强的可靠性,但是相对于普通集群会消耗掉更多的存储空间和带宽。比较适用于可靠性需求较高的业务场景。

4.1 集群部署拓扑图

镜像集群拓扑图

网络资源规划:

节点名 主机名 IP:PORT 程序 操作系统
主节点 MQ-M 192.168.0.20:15672 / 25672 / 5672 / 4369 RabbitMQ CentOS8
备节点≥1 MQ-S1 192.168.0.21:15672 / 25672 / 5672 / 4369 RabbitMQ CentOS8

4.2 镜像集群部署

1、参照章节 "2.RabbitMQ 单点安装和配置" 在主备节点上安装 RabbitMQ 中间件。

2、参照章节 "3.RabbitMQ 普通集群" 部署 RabbitMQ 普通集群。

3、将普通集群升级为镜像集群。

RabbitMQ 通过在集群上设置策略实现匹配对象的消息同步,可以通过指令和可视化管理插件设置。

1)指令设置:

指令格式如下:

./rabbitmqctl set_policy <name> [-p <vhost>] <pattern> <definition> [--apply-to <apply-to>]
  • name: 策略名称。
  • vhost: 指定 vhost , 默认值【 / 】。
  • pattern: 匹配对象的正则表达式,【 ^ 】代表全部匹配。
  • definition: 匹配模式,通过 JSON 字符串设置,如: '{"ha-mode":"all"}'。其中:
    ① "ha-mode" 指明镜像队列的模式,有效值为 "all/exactly/nodes"。"all" 表示在集群所有的节点上进行镜像,无需设置 "ha-params";"exactly" 表示在指定个数的节点上进行镜像,节点的个数由 "ha-params"指定; " nodes" 表示在指定的节点上进行镜像,节点名称通过 "ha-params" 指定;
    ② "ha-params" 指明 "ha-mode" 模式需要用到的参数;
    ③ "ha-sync-mode"指明镜像队列中消息的同步方式,有效值为 "automatic/manually"。
  • apply-to: 匹配的对象,默认【 all 】代表全部匹配。其中:
    ① "exchanges" 表示匹配交换器;
    ② "queues" 表示匹配队列;
    ③ "all" 表示同时匹配交换器和队列。

设置集群中所有的对象为镜像模式,在集群 "主节点" (或者任意已经加入集群的节点)上执行以下指令:

[centos@MQ-M ~]$ sudo -u rabbitmq rabbitmqctl set_policy replall -p / '^' '{"ha-mode":"all"}' --apply-to all
Setting policy "replall" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...

2)可视化插件设置:

在集群中创建消息同步策略

5.RabbitMQ 镜像集群+ Keepalived 高可用

5.1 集群部署拓扑图

高可用镜像集群拓扑图

网络资源规划:

1、RabbitMQ 集群:

节点名 主机名 IP:PORT 程序 操作系统
主节点 MQ-M 192.168.0.20:15672 / 25672 / 5672 / 4369 RabbitMQ CentOS8
备节点≥1 MQ-S1 192.168.0.21:15672 / 25672 / 5672 / 4369 RabbitMQ CentOS8

2、Keepalived 高可用虚拟 IP 是【192.168.0.30】。

5.2 高可用镜像集群部署

1、参照章节 "2.RabbitMQ 单点安装和配置" 在主备节点上安装 RabbitMQ 中间件。

2、参照章节 "3.RabbitMQ 普通集群" 部署 RabbitMQ 普通集群。

3、参照章节 "3.RabbitMQ 镜像集群" 将普通集群升级为镜像集群。

在各个集群节点(MQ-M 、MQ-S1 )上安装、配置 Keepalived。以集群 "主节点" 为例:

4、安装 EPEL 的 Yum 源。

使用文本编辑器创建仓库配置文件:

[centos@MQ-M ~ ]$ sudo gedit /etc/yum.repos.d/epel.repo

在文件中编写以下内容并保存:

[epel-modular]
name=Extra Packages for Enterprise Linux Modular $releasever - $basearch
baseurl=http://mirrors.aliyun.com/epel/$releasever/Modular/$basearch
enabled=1
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/epel/RPM-GPG-KEY-EPEL-8

[epel]
name=Extra Packages for Enterprise Linux $releasever - $basearch
baseurl=http://mirrors.aliyun.com/epel/$releasever/Everything/$basearch
enabled=1
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/epel/RPM-GPG-KEY-EPEL-8

更新 Yum 源:

[centos@Proxy-1 ~]$ sudo dnf clean all
[centos@Proxy-1 ~]$ sudo dnf makecache
Extra Packages for Enterprise Linux Modular 8 - 429 kB/s | 118 kB     00:00    
Extra Packages for Enterprise Linux 8 - x86_64  3.7 MB/s | 6.9 MB     00:01    
元数据缓存已建立。

EPEL(Extra Packages for Enterprise Linux)是企业级 Linux 操作系统的扩展包仓库,为 Redhat/CentOS系统提供大量的额外软件包。

5、安装 Keepalived。

[centos@MQ-M ~]$ sudo dnf install keepalived

程序安装目录是"/usr/sbin",配置文件目录是"/etc/keepalived"。

6、设置 Keepalived 配置文件参数。

使用文本编辑器打开配置文件:

[centos@MQ-M ~ ]$ sudo gedit /etc/keepalived/keepalived.conf

在文件中编写以下内容并保存:

# 定义全局配置
global_defs {
    # 本地节点 ID 标识,一般设置为主机名。
    router_id MQ-M
}

# 定义周期性执行的脚本,脚本的退出状态码会被调用它的所有的 vrrp_instance 记录。
vrrp_script chk_rabbitmq {
    # 执行脚本的路径。
    script "/etc/keepalived/rabbitmq_check.sh"
    # 脚本执行的间隔(单位是秒)。默认为1s。
    interval 2
    # 当脚本调整优先级,从 -254 到 254。默认为2。
    # 1. 如果脚本执行成功(退出状态码为0),weight大于0,则priority增加。
    # 2. 如果脚本执行失败(退出状态码为非0),weight小于0,则priority减少。
    # 3. 其他情况下,priority不变。
    weight -20
    # 当脚本执行超过时长(单位是秒)则被认为执行失败。
    # 运行脚本的用户和组。
    user root root
    # timeout 30
    # 当脚本执行成功到设定次数时,才认为是成功。
    # rise 1
    # 当脚本执行失败到设定次数时,才认为是失败。
    # fall 3
}

# 定义虚拟路由,可以定义多个。
vrrp_instance VI_1 {
    # 本地节点初始状态,包括 MASTER(主节点) 和 BACKUP (备节点)。
    state MASTER
    # 本地节点绑定虚拟 IP 的网络接口。
    interface ens33
    # 本地节点优先级,优先级高的节点将动态变成 MASTER 节点,接管 VIP 。初始状态下,MASTER 节点的优先级必须高于 BACKUP 节点。
    priority 100
    # VRRP 实例 ID,范围是0-255。同一集群的所有节点应设置一致的值。
    virtual_router_id 216
    # 组播信息发送时间间隔。同一集群的所有节点必须设置一样,默认为1秒。
    advert_int 1
    # 设置验证信息。同一集群的所有节点必须一致
    authentication {
        # 指定认证方式。PASS 表示简单密码认证(推荐);AH:IPSEC认证(不推荐)。
        auth_type PASS
        # 指定认证所使用的密码,最多8位。
        auth_pass 1111
    }

    # 声明调用已定义的 vrrp_script 脚本。
    track_script {
        chk_rabbitmq
    }

    # 定义虚拟 IP 地址。
    virtual_ipaddress {
        192.168.0.30
    }
}

# 定义对外提供服务 LVS (负载均衡)的 VIP 和 端口(当端口号设置为【0】时,表示所有端口),只实现高可用时可不配置。
virtual_server 192.168.0.30 5672 {  
    # 设置健康检查时间,单位是秒
    delay_loop 6 
    #负载均衡调度算法
    lb_algo rr
    # 设置LVS实现负载的机制,有NAT、TUN、DR三个模式    
    lb_kind DR
    # VIP 子网掩码
    nat_mask 255.255.255.0
    # 会话保持时间,一定时间之内用户无响应则下一次用户请求时需重新路由,一般设为0,表示不需要    
    persistence_timeout 0
    # 网络协议
    protocol TCP
    # 定义后端 RealServer 的真实服务器属性,IP 地址和端口(当端口号设置为【0】时,表示所有端口)
    real_server 192.168.0.20 5672 { 
        # 配置节点权值,数字越大权重越高 
        weight 1
        TCP_CHECK {  
            connect_timeout 10         
            nb_get_retry 3  
            delay_before_retry 3  
            connect_port 80  
        }  
    }  
    real_server 192.168.0.21 5672 {
        weight 1
        TCP_CHECK {  
            connect_timeout 10  
            nb_get_retry 3  
            delay_before_retry 3  
            connect_port 80  
        }  
    }  
}

初始化的主节点和备节点的区别体现在以下参数中:

  • 初始主节点
vrrp_instance VI_1 {
    # 必须设置为 MASTER 。
    state MASTER
    # 必须设置为最大值。
    priority 100
}
  • 初始备节点
vrrp_instance VI_1 {
    # 必须设置为 BACKUP 。
    state BACKUP
    # 必须设置为小于主节点的值。
    priority 90
}

7、创建或编辑 RabbitMQ 检测脚本文件。文件路径对应配置文件中 vrrp_script 的 script 设置值。

使用文本编辑器创建脚本文件:

[centos@MQ-M ~ ]$ sudo gedit /etc/keepalived/rabbitmq_check.sh

在脚本文件中编写以下内容并保存:

#!/bin/bash
counter=$(/usr/sbin/rabbitmqctl node_health_check | grep 'Health check passed' | wc -l)
if [ "${counter}" = "0" ]; then
    sudo -u rabbitmq /usr/sbin/rabbitmq-server detached
    sleep 5
    counter=$(/usr/sbin/rabbitmqctl node_health_check | grep 'Health check passed' | wc -l)
    if [ "${counter}" = "0" ]; then
        killall -9 keepalived
    fi
fi

给脚本文件增加可执行权限:

[centos@MQ-M ~ ]$ sudo chmod 755 /etc/keepalived/rabbitmq_check.sh

8、配置 Keepalived 系统服务。

使用文本编辑器创建配置文件:

[centos@MQ-M ~ ]$ sudo gedit /usr/lib/systemd/system/keepalived.service

验证或修改文件内容并保存如下:

[Unit]
Description=LVS and VRRP High Availability Monitor
After=network-online.target syslog.target rabbitmq-server.service
Wants=network-online.target
Requires=rabbitmq-server.service

[Service]
Type=forking
User=root
Group=root
PIDFile=/var/run/keepalived.pid
KillMode=process
EnvironmentFile=-/etc/sysconfig/keepalived
ExecStart=/usr/sbin/keepalived $KEEPALIVED_OPTIONS
ExecReload=/bin/kill -HUP $MAINPID
ExecStop=/bin/kill -HUP $MAINPID

[Install]
WantedBy=multi-user.target

重新加载系统服务管理器:

[centos@MQ-M ~ ]$ sudo systemctl daemon-reload

9、设置防火墙端口(CentOS8默认安装firewall防火墙),允许"112"端口(Keepalived 默认端口)访问服务器。

[centos@MQ-M ~ ]$ sudo firewall-cmd --zone=public --add-port=112/tcp --permanent
[centos@MQ-M ~ ]$ sudo firewall-cmd --reload

10、启动/重启 Keepalived 服务(不建议设置为开机自启动)。

启动 Keepalived 服务之前,应确保已正确启动了各节点的 RabbitMQ 服务。各节点的启动或重启的顺序为:① 启动 Keepalived 主节点;② 依次启动 Keepalived 备节点。

[centos@MQ-M ~ ]$ sudo systemctl restart keepalived.service

11、启动 Keepalived 可能因为各种未知的原因失败,主要是由于引发了 SELinux 异常。有关如何解决 SELinux 引起的异常,请阅读文章《RedHat/CentOS8【SELinux】引起的安全策略问题解决方案》,文章地址【//www.greatytc.com/p/a13f974f8bae】。

注意:其他 RabbitMQ 集群节点全部需要按照以上步骤配置。

12、使用浏览器通过虚拟 IP 访问 RabbitMQ 服务。


6.RabbitMQ 运维管理

RabbitMQ 提供可视化插件命令管道两种运维管理途径。

6.1.通过可视化插件运维管理

1、启动 RabbitMQ 服务。

2、使用 RabbitMQ 管理工具【rabbitmqctl】初始化管理员用户。【rabbitmqctl】指令只能有 root 和 rabbitmq 用户使用。

1)创建新用户:

[centos@host ~]$ sudo -u rabbitmq rabbitmqctl add_user admin password
Adding user "admin" ...

【add_user】表示创新以一个新用户,格式为: add_user <用户名> <用户口令>

2)设置用户角色:

[centos@host ~]$ sudo -u rabbitmq rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

【set_user_tags】表示设置用户的角色,格式为: set_user_tags <用户名> <角色名>。"administrator" 是内置的超级管理员。
内置角色有:
management :访问 management plugin;
policymaker :访问 management plugin 和管理自己 vhosts 的策略和参数;
monitoring :访问 management plugin 和查看所有配置和通道以及节点信息;
administrator :一切权限;
none :无配置。

3)设置用户权限:

[centos@host ~]$ sudo -u rabbitmq rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
Setting permissions for user "admin" in vhost "/" ...

【set_permissions】表示设置用户的访问权限,格式为: set_permissions [-p <vhost>] <user> <conf> <write> <read>。RabbitMQ 默认的 "<vhost>" 是 "/" ,"<user>" 表示用户名,"<conf>"、"<write>"、"<read>"的位置分别用正则表达式来匹配特定的资源,【'.*'】匹配所有资源,【'^$'】不匹配任何资源。

3、使用 RabbitMQ 管理工具【rabbitmq-plugins】启用 RabbitMQ 可视化管理插件。【rabbitmq-plugins】指令只能有 root 和 rabbitmq 用户使用。

[centos@host ~]$ sudo -u rabbitmq rabbitmq-plugins enable rabbitmq_management

4、设置防火墙端口(CentOS8默认安装firewall防火墙),允许 "15672"、"25672"、"5672" 、"4369" 端口(RabbitMQ 默认的 Http 管理插件、Erlang 订阅、AMQP、Epmd 端口)访问服务器。

[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
[centos@host ~]$ sudo firewall-cmd --reload

5、验证 RabbitMQ 可视化管理插件正常运行。使用浏览器客户端访问 RabbitMQ 可视化管理插件的 Http 服务接口进行测试,输入:http://<IP>:<PORT>

RabbitMQ 可视化管理插件-用户认证
RabbitMQ 可视化管理插件-主界面

6.2.通过命令管道运维管理

1、管理 RabbitMQ 服务:

rabbitmq-server start
# 启动服务

rabbitmq-server stop
# 停止服务

rabbitmq-server restart
# 重新启动服务

rabbitmq-server status
# 查看服务状态

rabbitmq-server detached
# 后台运行服务

正确安装 RabbitMQ 后,程序提供【rabbitmqctl】指令,可用于 RabbitMQ 服务的运维管理。

2、【rabbitmqctl】语法:

rabbitmqctl [-n <node>] [-q][-h] <command> [<command options>] 
其中:
-n <node>:默认 node 名称是 "rabbit@hostname",如果你的主机明是 "MQ-M",那么 node 名称是 "rabbit@'MQ-M"。
-q:安静输出模式,信息会被禁止输出。
-h:查看帮助信息。
command:管理指令。
command options:管理指令的参数。

3、基本管理指令:

shutdown
# 关闭 RabbitMQ 服务。

stop [<pid_file>]  
# 停止在 erlang node 上运行的 RabbitMQ。

stop_app   
# 停止 erlang node 上的 RabbitMQ 的应用,但是 erlang node 还是会继续运行。

start_app   
# 启动 erlang node 上的 RabbitMQ 的应用。

wait <pid_file>  
# 等待 RabbitMQ 服务启动。

reset  
# 初始化 erlang node 状态,会从集群中删除该节点,从管理数据库中删除所有数据,例如:vhosts 等。在初始化之前 RabbitMQ 的应用必须先停止。

force_reset  
# 无条件的初始化 erlang node 状态。

status
# 查看 RabbitMQ 服务的状态。

rotate_logs <suffix>   
# 轮转日志文件。

3、集群管理指令:

join_cluster <clusternode> [--ram]  
# 加入到指定主节点的集群中。clusternode 表示 node 名称,--ram 表示 node 以 ram node(内存节点) 加入集群中。默认 node 以 disc node(磁盘节点) 加入集群,在一个 node 加入 cluster 之前,必须先停止该 node 的 RabbitMQ 应用,即先执行 stop_app。

cluster_status  
# 显示 cluster 中的所有 node 的状态。

change_cluster_node_type disc | ram  
# 改变一个 cluster 中 node 的模式,该节点在转换前必须先停止,不能把一个集群中唯一的 disk node 转化为 ram node。

forget_cluster_node [--offline]  
# 远程移除 cluster 中的一个 node,前提是该 node 必须处于 offline 状态,如果是 online 状态,则需要加--offline 参数。

sync_queue queue  
# 同步镜像队列。

cancel_sync_queue queue    
# 取消同步镜像队列。

4、用户管理指令:

add_user <username> <password>  
# 在 RabbitMQ 的内部数据库添加用户。

delete_user <username>  
# 删除一个用户。

change_password <username> <newpassword>  
# 改变用户密码。

clear_password <username> 
# 清除用户密码,禁止用户登录。

set_user_tags <username> <tag> ...
# 设置用户 tags(内置角色)。内置角色有:
# > management :访问 management plugin;
# > policymaker :访问 management plugin 和管理自己 vhosts 的策略和参数;
# > monitoring :访问 management plugin 和查看所有配置和通道以及节点信息;
# > administrator :一切权限;
# > none :无配置。

list_users  
# 列出用户。

add_vhost <vhostpath>  
# 创建一个 vhosts 。

delete_vhost <vhostpath>  
# 删除一个 vhosts 。
list_vhosts [<vhostinfoitem> ...]  
# 列出 vhosts 。

set_permissions [-p <vhostpath>] <user> <conf> <write> <read>  
# 针对一个 vhosts 给用户赋予相关权限。

clear_permissions [-p <vhostpath>] <username>  
# 清除一个用户对 vhosts 的权限。

list_permissions [-p <vhostpath>]   
# 列出哪些用户可以访问该 vhosts。

list_user_permissions <username>  
# 列出该用户的访问权限。

5、策略管理指令:

set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>]  
#  > name: 策略名称。
#  > vhost: 指定 vhost , 默认值【 / 】。
#  > pattern: 匹配对象的正则表达式,【 ^ 】代表全部匹配。
#  > definition: 匹配模式,通过 JSON 字符串设置,如: '{"ha-mode":"all"}'。其中:
#  "ha-mode" 指明镜像队列的模式,有效值为 "all/exactly/nodes"。"all"  表示在集群所有的节点上进行镜像,无需设置 "ha-params";"exactly" 表示在指定个数的节点上进行镜像,节点的个数由 "ha-params"指定; " nodes" 表示在指定的节点上进行镜像,节点名称通过 "ha-params" 指定;
#  "ha-params" 指明 "ha-mode" 模式需要用到的参数;
#  "ha-sync-mode"指明镜像队列中消息的同步方式,有效值为 "automatic/manually"。
# > apply-to: 匹配的对象,默认【 all 】代表全部匹配。其中:
# "exchanges" 表示匹配交换器;
# "queues" 表示匹配队列;
# "all" 表示同时匹配交换器和队列。

clear_policy [-p <vhostpath>] <name>  
# 清除一个策略。

list_policies [-p <vhostpath>]  
# 列出已有的策略。

6、插件管理指令:

rabbitmq-plugins <command> [<command options>]
# Commands:
#     list [-v] [-m] [-E] [-e] [<pattern>]  显示所有的的插件。-v 显示版本 -m 显示名称 -E 显示明确已经开启的 -e 显示明确的和暗中开启的
#     enable <plugin> ...   开启一个插件
#     disable <plugin> ...  关闭一个插件

如:
rabbitmq-plugins enable rabbitmq_management
# 开启可视化插件。

7、其他管理指令:

list_queues [-p <vhostpath>] [<queueinfoitem> ...]  
# 返回 queue 的信息,如果省略了-p参数,则默认显示的是【 / 】的 vhosts 信息。

list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]  
#返回 exchange 的信息。

list_bindings [-p <vhostpath>] [<bindinginfoitem> ...] 
# 返回绑定信息。

list_connections [<connectioninfoitem> ...]  
# 返回链接信息。

list_channels [<channelinfoitem> ...]  
# 返回目前所有的 channels 。

list_consumers [-p <vhostpath>]  
# 返回 consumers。

status  
# 显示 broker 的状态。

environment  
# 显示环境参数的信息。

report  
# 返回一个服务状态 report 。

7.Java 开发集成示例

amqp-client 是 RabbitMQ 的 JavaAPI 包,以下是 Maven 项目的程序案例。

1、从 Maven 库中引入 amqp-client 包。

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.9.0</version>
</dependency>

2、编写 Producter(生产者)端示例程序。本示例实现【每隔 1 秒钟无线循环向队列提交一个 UUID 消息】。

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 *  生产者提交消息。
 *
 */
public class Producter {
    public static void main(String[] args) {
        
        // RabbitMQ 服务地址
        String mqIp = "192.168.0.30";
        // RabbitMQ 服务端口
        int mqPort = 5672;
        // RabbitMQ 登录账号
        String mqUser = "admin";
        // RabbitMQ 登录账号口令
        String mqPass = "123456a?";

        // 定义虚拟主机
        String vhost = "/";
        // 定义交换器(如果 RabbitMQ 上没有这个 exchange 会自动创建)
        String exchange = "test_exchange";
        // 定义队列(如果 RabbitMQ 上没有这个 queue 会自动创建)
        String queue = "test_queue";
        // 定义绑定KEY(如果 RabbitMQ 上没有这个 bandingkey 会自动创建)
        String bandingkey = "test_bandingkey";

        // RabbitMQ Tcp 连接工厂实例。
        ConnectionFactory factory = null;
        // RabbitMQ Tcp 连接实例。
        Connection connection = null;
        // RabbitMQ 信道实例。
        Channel channel = null;

        try {
            
            // 创建 RabbitMQ 连接工厂
            factory = new ConnectionFactory();

            factory.setHost(mqIp);
            factory.setPort(mqPort);
            factory.setUsername(mqUser);
            factory.setPassword(mqPass);
            factory.setVirtualHost(vhost);

            // 创建 RabbitMQ Tcp 连接实例
            connection = factory.newConnection();
            
            // 创建 RabbitMQ 频道
            channel = connection.createChannel();
            
            // 声明交换机类型
            channel.exchangeDeclare(exchange, "fanout", true);
            
            // 声明队列
            channel.queueDeclare(queue, true, false, true, null);
            
            // 也可以获取默认队列
            // String queue = channel.queueDeclare().getQueue();
            
            // 将队列与交换机绑定
            channel.queueBind(queue, exchange, bandingkey);
            
            // 将数据提交到队列,本例中每隔1秒钟无线循环写入一个 UUID 消息。
            while (true) {
                String uuid = UUID.randomUUID().toString();
                System.out.println("提交消息:" + uuid);
                channel.basicPublish(exchange, bandingkey, null, uuid.getBytes());
                Thread.sleep(1000);
            }
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试运行结果:

生产者提交消息

2、编写 Consumer(消费者)端示例程序。本示例实现【从队列中获取已提交的 UUID 消息】。

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 *  消费者获取消息。
 *
 */
public class Consumer {

    public static void main(String[] args) {

        // RabbitMQ 服务地址
        String mqIp = "192.168.0.30";
        // RabbitMQ 服务端口
        int mqPort = 5672;
        // RabbitMQ 登录账号
        String mqUser = "admin";
        // RabbitMQ 登录账号口令
        String mqPass = "123456a?";

        // 定义虚拟主机
        String vhost = "/";
        // 定义交换器(如果 RabbitMQ 上没有这个 exchange 会自动创建)
        String exchange = "test_exchange";
        // 定义队列(如果 RabbitMQ 上没有这个 queue 会自动创建)
        String queue = "test_queue";
        // 定义绑定KEY(如果 RabbitMQ 上没有这个 bandingkey 会自动创建)
        String bandingkey = "test_bandingkey";

        // RabbitMQ Tcp 连接工厂实例。
        ConnectionFactory factory = null;
        // RabbitMQ Tcp 连接实例。
        Connection connection = null;
        // RabbitMQ 信道实例。
        Channel channel = null;

        try {

            // 创建 RabbitMQ 连接工厂
            factory = new ConnectionFactory();

            factory.setHost(mqIp);
            factory.setPort(mqPort);
            factory.setUsername(mqUser);
            factory.setPassword(mqPass);
            factory.setVirtualHost(vhost);

            // 创建 RabbitMQ Tcp 连接实例
            connection = factory.newConnection();

            // 创建 RabbitMQ 频道
            channel = connection.createChannel();

            // 声明交换机类型
            channel.exchangeDeclare(exchange, "fanout", true);

            // 声明队列
            channel.queueDeclare(queue, true, false, true, null);

            // 也可以获取默认队列
            // String queue = channel.queueDeclare().getQueue();

            // 将队列与交换机绑定
            channel.queueBind(queue, exchange, bandingkey);

            // 从队列获取数据,本例中每隔1秒钟无线循环写入一个 UUID 消息。
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("获取消息:" + message);
                }
            };
            
            // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
            channel.basicConsume(queue, true, consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }

}

测试运行结果:

消费者获取消息

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345