RabbitMQ 核心概念

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。

为什么用RabbitMQ

  • 开源、性能优秀、稳定性保障
  • RabbitMQ提供可靠性消息投递模式(confirm)、返回模式(return)
  • SpringAMQP完美的整合、API丰富
  • 集群模式丰富,表达式配置、HA模式、镜像队列模型
  • 保证数据不丢失的前提下做到高可靠性、可用性

RabbitMQ的高性能之道是如何做到的

Erlang语言最初用于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
Erlang的优点:Erlang有着和原生Socket一样的延迟

什么是AMQP高级消息队列协议

AMQP:即Advanced Message Queuing Protocol,高级消息队列协议。

  • 是面向消息的中间件的开放标准应用层协议,AMQP的特征是消息导向,排队,路由(包括点对点和发布和订阅),可靠性和安全性。
  • AMQP要求消息传递提供商和客户端的行为在不同供应商实现可互操作的情况下,以与SMTP,HTTP,FTP等相同的方式创建了可互操作的系统。
  • AMQP协议是具有现代特征的二进制协议。一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开发标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
  • AMQP是一种二进制应用层协议,旨在有效地支持各种消息应用和通信模式。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。

AMQP的协议栈

image.png

  • Modle Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。
  • Session Layer:主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。
  • Transport Layer:主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。

AMQP协议模型

image.png

AMQP核心概念是什么

Server:
又称作Broker,用于接受客户端的连接,实现AMQP实体服务

Connection:
连接,应用程序与Broker的网络连接

Channel:
网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务

Message:
消息,服务器和应用程序之间传送的数据,有Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体内容,即我们要传输的数据;
仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。

Virtual Host:
虚拟地址,是一个逻辑概念,用于进行逻辑隔离,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue;
Virtual Host是权限控制的最小粒度;

Exchange:
交换机,用于接收消息,可根据路由键将消息转发到绑定的队列。

Binding:
Exchange和Queue之间的虚拟连接,Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。

Routing Key:
一个路由规则,虚拟机可用它来确定如何路由一个特定的消息;

Queue:
也称作Message Queue,即消息队列,用于保存消息并将他们转发给消费者;

RabbitMQ整体架构模型

image.png

生产者把消息投递到Exchange,Exchange投递到Queue.
因此我们的生产者只需要关注把消息投递到指定的Exchange即可,我们的消费者只需要监听指定Queue即可。就是这么简单的机制。
通过图我们也能看到,生产者不需要关注投递到哪个队列,消费也不需要关注是从哪个Exchange上来的,这两块没有耦合的情况。主要是应为Exchange和Queue有一个绑定的关系。

RabbitMQ消息流转

image.png

生产者publisher application 生产消息Message投递到Exchange上,Exchange绑定MessageQueue,可以绑定过多个MessageQueue,为什么三个队列只有其中一个队列收到了消息呢?主要是由于Exchange是有一个路由功能的。这个路由就是routing key,这个路由有两个非常关键的点,
第一个:你的消息是需要发送到哪个Exchange。
第二个:你发消息的时候需要带上routing key,然后通过Exchange 和 MessageQueue 建立一个绑定关系,通过路由key把消息路由到一个指定的队列上。然后我们的消费端直接监听队列就行了,就可以消费了。

RabbitMQ的安装与使用

RabbitMQ官网地址:https://www.rabbitmq.com/
Erlang官网地址:https://www.erlang.org/
windows平台安装方式://www.greatytc.com/p/ba3b2c62dce1
Centos7平台安装方式:
绿色版安装:https://blog.csdn.net/weixin_41004350/article/details/83046842
rpm安装步骤:

准备环境:
yum install 
build-essential openssl openssl-devel unixODBC unixODBC-devel 
make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

下载:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

配置文件:
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
服务启动和停止:
启动 rabbitmq-server start &
停止 rabbitmqctl app_stop

管理插件:rabbitmq-plugins enable rabbitmq_management
访问地址:http://ip:15672/

RabbitMQ命令行与管控台

基础服务的命令操作

rabbitmqctl stop_app:关闭应用

rabbitmqctl start_app:启动应用

rabbtmqctl status:节点状态

rabbitmqctl add_user username password:添加用户

rabbitmqctl list_users:列出所有用户

rabbitmqctl delete_user username:删除用户

rabbitmqctl clear_permissions - p vhostpath username: 清除用户权限

rabbitmqctl list_user_permissions_username: 列出用户权限

rabbitmqctl change_password username newpassword:修改密码

rabbitmqctl set_permissions -p vhostpath username "." "." ".*" :设置用户权限

涉及的用户命令还有许多,这里就不一一列举了。

对rabbitmq 具体组件的命令
对虚拟主机操作

rabbitmqctl add_vhost vhostpath:创建虚拟主机

rabbitmqctl list_vhosts:列出所有虚拟主机

rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上所有权限

rabbitmqctl delete_vhost vhostpath:删除虚拟主机

队列操作

rabbitmqctl list_queues:查看所有队列信息

rabbitmqctl -p vhostpath purge_queue bule:清除队列里的消息

高级操作

rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_app之后使用

rabbitmqctl join_clust [--ram]:组成集群命令

rabbitmqctl clustr_status:查看集群状态

rabbitmqctl change_cluster_node_type disc|ram 修改集群节点的存储形式

rabbitmqctl forget_cluster_node [--offline] 忘记节点(摘除节点)

rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..] (修改节点名称)

更多的RabbitMQ命令行与管控台操作:https://www.cnblogs.com/coder-programming/p/11382322.html

RabbitMQ交换机详解

Exchange:交换机,用于接收消息,可根据路由键将消息转发到绑定的队列

image.png

Exchange的属性:

  • Name :名字,同一个virtual host里面的Name不能重复;不同的virtual host是可以重复的。
  • Type:fanout、direct、topic、headers四种
public enum BuiltinExchangeType {
    DIRECT("direct"),
    FANOUT("fanout"),
    TOPIC("topic"),
    HEADERS("headers");
}
  • Durability:是否持久化,true:是;false:否。如果不持久化,当break重启之后,当前的exchange会消失
  • Auto delete:当最后一个绑定到Exchange上的队列被删除后,自动删除该Exchange
  • Internal:当前Exchange是否是内部RabbitMQ内部使用,默认false,是的话,就意味着我们不能往该Exchange里面发送消息
  • Arguments:扩展参数,是AMQP协议留给AMQP实现做扩展用的。

RabbitMQ总共有四种Exchange模式(fanout、direct、topic、headers)
RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储
RabbitMQ提供了四种Exchange:fanout、direct、topic、headers
Headers Exchange 会忽略 RoutingKey 而根据消息中的Headers和创建绑定关系时指定的 Arguments来匹配决定路由到哪些Queue,在实际中使用较少,本文只对前三种模式进行比较。
性能排序:fanout > direct > topic。比例大约为11 :10 :6

Direct Exchange

image.png

Direct Exchange:处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
所有发送到Direct Exchange的消息被转发到RouteKey中指定的queue
注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

Channel channel = connection.createChannel();    
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic    
channel.queueDeclare("queueName");    
channel.queueBind("queueName", "exchangeName", "routingKey");    
    
byte[] messageBodyBytes = "hello world".getBytes();    
//需要绑定路由键    
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

Fanout Exchange

image.png

不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

  • 1、可以理解为路由表的模式
  • 2、这种模式不需要RouteKey
  • 3、这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
  • 4、如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
Channel channel = connection.createChannel();    
channel.exchangeDeclare("exchangeName", "fanout"); //direct fanout topic    
channel.queueDeclare("queueName");    
channel.queueBind("queueName", "exchangeName", "routingKey");    
    
channel.queueDeclare("queueName1");    
channel.queueBind("queueName1", "exchangeName", "routingKey1");    
    
byte[] messageBodyBytes = "hello world".getBytes();    
//路由键需要设置为空    
channel.basicPublish("exchangeName", "", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

Topic Exchange

image.png

Topic Exchange:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.” 只会匹配到“audit.irs”
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上

  • 1、这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
  • 2、这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
  • 3、在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
  • 4、可以使用通配符进行模糊匹配,“#”表示0个或若干个关键字,“”表示一个关键字。如“log.”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
  • 5、同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
Channel channel = connection.createChannel();    
channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic    
channel.queueDeclare("queueName");    
channel.queueBind("queueName", "exchangeName", "routingKey.*");    
    
byte[] messageBodyBytes = "hello world".getBytes();    
channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes); 

Headers Exchange
Headers Exchange 会忽略 RoutingKey 而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配决定路由到哪些 Queue。
Headers Exchange 的性能比较差,而且 Direct Exchange 完全可以代替它,所以不建议使用。

Default Exchange
Default Exchange 是一种特殊的 Direct Exchange。当你手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 RoutingKey 与队列名称相同。有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用。

RabbitMQ队列、绑定、虚拟主机、消息

Binding:
绑定Exchange和Exchange、Queue之间的连接关系,Binding中可以包含RoutingKey或者参数

Queue:
消息队列,实际存储消息数据

  • Durability:是否持久化,Durable:是,Transient:否
  • Auto Delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除

Message:
服务器和应用程序之间传送的数据,本质上是一段数据,有Properties和Payload(Body)组成
常用属性:delivery mode、headers(自定义属性)
其他属性:
content_type、content_encoding(字符集)、priority(消息优先级0-9,从小到大,优先级越来越高)、correlation_id(消息唯一id)、reply_to(重回队列,返回哪个队列)、expiration(消息过期时间)、message_id(消息id)、timestamp(时间戳)、type、user_id、app_id、cluster_id

  • Producer代码:
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        Map<String,Object> map = new HashMap<>();
        map.put("将故事写成我们",111);
        map.put("夜曲",222);

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("10000")
                .headers(map)
                .build();

        //4、通过Channel发送数据
        String msg = "hello rabbitmq!";
        //1 exchange   2 routingKey
        channel.basicPublish("","test001",properties,msg.getBytes("UTF-8"));

        //5、关闭相关连接
        channel.close();
        connection.close();
    }
}
  • Consumer端代码:
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //4、声明(创建)一个队列
        String queueName = "test001";
        channel.queueDeclare(queueName,true,false,false,null);

        //5、创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("RoutingKey="+envelope.getRoutingKey() + ",message=" + message);
                Map<String, Object> headers = properties.getHeaders();
                System.out.println("properties的属性:headers="+headers);
            }
        };

        //6、设置channel
        channel.basicConsume(queueName,true,consumer);
    }
}

Virtual Host:
虚拟主机,用于进行逻辑隔离,最上层的消息路由,一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host不能有相同名称的Exchange和Queue

参考:
https://www.cnblogs.com/shenyixin/p/9084249.html

https://www.cnblogs.com/luhan777/p/11162649.html

https://www.cnblogs.com/wuzhenzhao/p/10319677.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,123评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,031评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,723评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,357评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,412评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,760评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,904评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,672评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,118评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,456评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,599评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,264评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,857评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,731评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,956评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,286评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,465评论 2 348