RabbitMQ简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。
为什么用RabbitMQ
- 开源、性能优秀、稳定性保障
- RabbitMQ提供可靠性消息投递模式(confirm)、返回模式(return)
- SpringAMQP完美的整合、API丰富
- 集群模式丰富,表达式配置、HA模式、镜像队列模型
- 保证数据不丢失的前提下做到高可靠性、可用性
RabbitMQ的高性能之道是如何做到的
Erlang语言最初用于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
Erlang的优点:Erlang有着和原生Socket一样的延迟
什么是AMQP高级消息队列协议
AMQP:即Advanced Message Queuing Protocol,高级消息队列协议。
- 是面向消息的中间件的开放标准应用层协议,AMQP的特征是消息导向,排队,路由(包括点对点和发布和订阅),可靠性和安全性。
- AMQP要求消息传递提供商和客户端的行为在不同供应商实现可互操作的情况下,以与SMTP,HTTP,FTP等相同的方式创建了可互操作的系统。
- AMQP协议是具有现代特征的二进制协议。一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开发标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
- AMQP是一种二进制应用层协议,旨在有效地支持各种消息应用和通信模式。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。
AMQP的协议栈
- Modle Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。
- Session Layer:主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。
- Transport Layer:主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。
AMQP协议模型
AMQP核心概念是什么
Server:
又称作Broker,用于接受客户端的连接,实现AMQP实体服务
Connection:
连接,应用程序与Broker的网络连接
Channel:
网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
Message:
消息,服务器和应用程序之间传送的数据,有Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体内容,即我们要传输的数据;
仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。
Virtual Host:
虚拟地址,是一个逻辑概念,用于进行逻辑隔离,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue;
Virtual Host是权限控制的最小粒度;
Exchange:
交换机,用于接收消息,可根据路由键将消息转发到绑定的队列。
Binding:
Exchange和Queue之间的虚拟连接,Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
Routing Key:
一个路由规则,虚拟机可用它来确定如何路由一个特定的消息;
Queue:
也称作Message Queue,即消息队列,用于保存消息并将他们转发给消费者;
RabbitMQ整体架构模型
生产者把消息投递到Exchange,Exchange投递到Queue.
因此我们的生产者只需要关注把消息投递到指定的Exchange即可,我们的消费者只需要监听指定Queue即可。就是这么简单的机制。
通过图我们也能看到,生产者不需要关注投递到哪个队列,消费也不需要关注是从哪个Exchange上来的,这两块没有耦合的情况。主要是应为Exchange和Queue有一个绑定的关系。
RabbitMQ消息流转
生产者publisher application 生产消息Message投递到Exchange上,Exchange绑定MessageQueue,可以绑定过多个MessageQueue,为什么三个队列只有其中一个队列收到了消息呢?主要是由于Exchange是有一个路由功能的。这个路由就是routing key,这个路由有两个非常关键的点,
第一个:你的消息是需要发送到哪个Exchange。
第二个:你发消息的时候需要带上routing key,然后通过Exchange 和 MessageQueue 建立一个绑定关系,通过路由key把消息路由到一个指定的队列上。然后我们的消费端直接监听队列就行了,就可以消费了。
RabbitMQ的安装与使用
RabbitMQ官网地址:https://www.rabbitmq.com/
Erlang官网地址:https://www.erlang.org/
windows平台安装方式://www.greatytc.com/p/ba3b2c62dce1
Centos7平台安装方式:
绿色版安装:https://blog.csdn.net/weixin_41004350/article/details/83046842
rpm安装步骤:
准备环境:
yum install
build-essential openssl openssl-devel unixODBC unixODBC-devel
make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
下载:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
配置文件:
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
服务启动和停止:
启动 rabbitmq-server start &
停止 rabbitmqctl app_stop
管理插件:rabbitmq-plugins enable rabbitmq_management
访问地址:http://ip:15672/
RabbitMQ命令行与管控台
基础服务的命令操作
rabbitmqctl stop_app:关闭应用
rabbitmqctl start_app:启动应用
rabbtmqctl status:节点状态
rabbitmqctl add_user username password:添加用户
rabbitmqctl list_users:列出所有用户
rabbitmqctl delete_user username:删除用户
rabbitmqctl clear_permissions - p vhostpath username: 清除用户权限
rabbitmqctl list_user_permissions_username: 列出用户权限
rabbitmqctl change_password username newpassword:修改密码
rabbitmqctl set_permissions -p vhostpath username "." "." ".*" :设置用户权限
涉及的用户命令还有许多,这里就不一一列举了。
对rabbitmq 具体组件的命令
对虚拟主机操作
rabbitmqctl add_vhost vhostpath:创建虚拟主机
rabbitmqctl list_vhosts:列出所有虚拟主机
rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上所有权限
rabbitmqctl delete_vhost vhostpath:删除虚拟主机
队列操作
rabbitmqctl list_queues:查看所有队列信息
rabbitmqctl -p vhostpath purge_queue bule:清除队列里的消息
高级操作
rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_app之后使用
rabbitmqctl join_clust [--ram]:组成集群命令
rabbitmqctl clustr_status:查看集群状态
rabbitmqctl change_cluster_node_type disc|ram 修改集群节点的存储形式
rabbitmqctl forget_cluster_node [--offline] 忘记节点(摘除节点)
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..] (修改节点名称)
更多的RabbitMQ命令行与管控台操作:https://www.cnblogs.com/coder-programming/p/11382322.html
RabbitMQ交换机详解
Exchange:交换机,用于接收消息,可根据路由键将消息转发到绑定的队列
Exchange的属性:
- Name :名字,同一个virtual host里面的Name不能重复;不同的virtual host是可以重复的。
- Type:fanout、direct、topic、headers四种
public enum BuiltinExchangeType {
DIRECT("direct"),
FANOUT("fanout"),
TOPIC("topic"),
HEADERS("headers");
}
- Durability:是否持久化,true:是;false:否。如果不持久化,当break重启之后,当前的exchange会消失
- Auto delete:当最后一个绑定到Exchange上的队列被删除后,自动删除该Exchange
- Internal:当前Exchange是否是内部RabbitMQ内部使用,默认false,是的话,就意味着我们不能往该Exchange里面发送消息
- Arguments:扩展参数,是AMQP协议留给AMQP实现做扩展用的。
RabbitMQ总共有四种Exchange模式(fanout、direct、topic、headers)
RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储
RabbitMQ提供了四种Exchange:fanout、direct、topic、headers
Headers Exchange 会忽略 RoutingKey 而根据消息中的Headers和创建绑定关系时指定的 Arguments来匹配决定路由到哪些Queue,在实际中使用较少,本文只对前三种模式进行比较。
性能排序:fanout > direct > topic。比例大约为11 :10 :6
Direct Exchange
Direct Exchange:处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
所有发送到Direct Exchange的消息被转发到RouteKey中指定的queue
注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey");
byte[] messageBodyBytes = "hello world".getBytes();
//需要绑定路由键
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
Fanout Exchange
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
- 1、可以理解为路由表的模式
- 2、这种模式不需要RouteKey
- 3、这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
- 4、如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "fanout"); //direct fanout topic
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey");
channel.queueDeclare("queueName1");
channel.queueBind("queueName1", "exchangeName", "routingKey1");
byte[] messageBodyBytes = "hello world".getBytes();
//路由键需要设置为空
channel.basicPublish("exchangeName", "", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
Topic Exchange
Topic Exchange:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.” 只会匹配到“audit.irs”
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
- 1、这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
- 2、这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
- 3、在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
- 4、可以使用通配符进行模糊匹配,“#”表示0个或若干个关键字,“”表示一个关键字。如“log.”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
- 5、同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey.*");
byte[] messageBodyBytes = "hello world".getBytes();
channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
Headers Exchange
Headers Exchange 会忽略 RoutingKey 而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配决定路由到哪些 Queue。
Headers Exchange 的性能比较差,而且 Direct Exchange 完全可以代替它,所以不建议使用。
Default Exchange
Default Exchange 是一种特殊的 Direct Exchange。当你手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 RoutingKey 与队列名称相同。有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用。
RabbitMQ队列、绑定、虚拟主机、消息
Binding:
绑定Exchange和Exchange、Queue之间的连接关系,Binding中可以包含RoutingKey或者参数
Queue:
消息队列,实际存储消息数据
- Durability:是否持久化,Durable:是,Transient:否
- Auto Delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除
Message:
服务器和应用程序之间传送的数据,本质上是一段数据,有Properties和Payload(Body)组成
常用属性:delivery mode、headers(自定义属性)
其他属性:
content_type、content_encoding(字符集)、priority(消息优先级0-9,从小到大,优先级越来越高)、correlation_id(消息唯一id)、reply_to(重回队列,返回哪个队列)、expiration(消息过期时间)、message_id(消息id)、timestamp(时间戳)、type、user_id、app_id、cluster_id
- Producer代码:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3、通过Connection创建一个Channel
Channel channel = connection.createChannel();
Map<String,Object> map = new HashMap<>();
map.put("将故事写成我们",111);
map.put("夜曲",222);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.headers(map)
.build();
//4、通过Channel发送数据
String msg = "hello rabbitmq!";
//1 exchange 2 routingKey
channel.basicPublish("","test001",properties,msg.getBytes("UTF-8"));
//5、关闭相关连接
channel.close();
connection.close();
}
}
- Consumer端代码:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3、通过Connection创建一个Channel
Channel channel = connection.createChannel();
//4、声明(创建)一个队列
String queueName = "test001";
channel.queueDeclare(queueName,true,false,false,null);
//5、创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("RoutingKey="+envelope.getRoutingKey() + ",message=" + message);
Map<String, Object> headers = properties.getHeaders();
System.out.println("properties的属性:headers="+headers);
}
};
//6、设置channel
channel.basicConsume(queueName,true,consumer);
}
}
Virtual Host:
虚拟主机,用于进行逻辑隔离,最上层的消息路由,一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host不能有相同名称的Exchange和Queue
参考:
https://www.cnblogs.com/shenyixin/p/9084249.html