程序员~高级~消息队列

一、什么是消息队列

    MQ (Message Quene) : 翻译为 消息队列 ,通过典型的 ⽣产者 和 消费者 模型,⽣产者不断向消息队列中⽣产消息,消费者不断的从队列中获取消息。因为消息的⽣产和消费都是异步的,⽽且只关⼼消息的发送和接收,没有业务逻辑的侵⼊,轻松的实现系统间解耦。别名为 消息中间件 通过利⽤⾼效可靠的消息传递机制进⾏平台⽆关的数据交流,并基于数据通信来进⾏分布式系统的集成。

二、为什么要使用MQ

1.解耦

现在我有一个系统A,系统A可以产生一个userId,然后,现在有系统B和系统C都需要这个userId去做相关的操作,可以写成如下操作

如果有一天,系统B的负责人告诉系统A的负责人,现在系统B的SystemBNeed2do(String userId)这个接口不再使用了,让系统A别去调它了。那么就需要从代码的基础上去修改了。这样紧密的耦合关系会导致很多麻烦,如果使用消息中间件就不会出现以上问题。

2.异步

我们再来看看下面这种情况:系统A还是直接调用系统B、C、D

假设系统A运算出userId具体的值需要50ms,调用系统B的接口需要300ms,调用系统C的接口需要300ms,调用系统D的接口需要300ms。那么这次请求就需要50+300+300+300=950ms

并且我们得知,系统A做的是主要的业务,而系统B、C、D是非主要的业务。比如系统A处理的是订单下单,而系统B是订单下单成功了,那发送一条短信告诉具体的用户此订单已成功,而系统C和系统D也是处理一些小事而已。

那么此时,为了提高用户体验和吞吐量,其实可以异步地调用系统B、C、D的接口。所以,我们可以弄成是这样的:

系统A执行完了以后,将userId写到消息队列中,然后就直接返回了(至于其他的操作,则异步处理)。

本来整个请求需要用950ms(同步)

现在将调用其他系统接口异步化,只需要100ms(异步)

3、削峰/限流

假设现在我们每个月要搞一次大促,大促期间的并发可能会很高的,比如每秒3000个请求。假设我们现在有两台机器处理请求,并且每台机器只能每次处理1000个请求。

那多出来的1000个请求,可能就把我们整个系统给搞崩了...所以,有一种办法,我们可以写到消息队列中:

系统B和系统C根据自己的能够处理的请求数去消息队列中拿数据,这样即便有每秒有8000个请求,那只是把请求放在消息队列中,去拿消息队列的消息由系统自己去控制,这样就不会把整个系统给搞崩。

三、常用的消息中间件

当今市⾯上有很多主流的消息中间件,如⽼牌的 ActiveMQ 、 RabbitMQ ,炙⼿可热的Kafka ,阿⾥巴巴⾃主开发 RocketMQ 等。

#1.ActiveMQ

ActiveMQ 是Apache出品,最流⾏的,能⼒强劲的开源消息总线。它是⼀个完全⽀持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为⽼牌的消息中间件,在中⼩型企业颇受欢迎!

# 2.Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,⽬前归属于Apache顶级项⽬。Kafka主要特点是基于Pull的模式来处理消息消费,追求⾼吞吐量,⼀开始的⽬的就是⽤于⽇志收集和传输。0.8版本开始⽀持复制,不⽀持事务,对消息的重复、丢失、错误没有严格要求,适合产⽣⼤量数据的互联⽹服务的数据收集业务。

# 3.RocketMQ

RocketMQ是阿⾥开源的消息中间件,它是纯Java开发,具有⾼吞吐量、⾼可⽤性、适合⼤规模分

布式系统应⽤的特点。RocketMQ思路起源于Kafka,但并不是Kafka的⼀个Copy,它

对消息的可靠传输及事务性做了优化,⽬前在阿⾥集团被⼴泛应⽤于交易、充值、流计算、消

息推送、⽇志流式处理、binglog分发等场景。

# 4.RabbitMQ

RabbitMQ⽐Kafka可靠,Kafka更适合IO⾼吞吐的处理,⼀般应⽤在⼤数据⽇志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使⽤,⽐如ELK⽇志收集。

四、RabbitMQ

基于 AMQP 协议,erlang语⾔开发,是部署最⼴泛的开源消息中间件,是最受欢迎的开源消息中间件之⼀。

官⽹ :https://www.rabbitmq.com/

官⽅教程 :https://www.rabbitmq.com/#getstarted

1.AMQP 协议

AMQP(advanced message queuingprotocol)`在2003年时被提出,最早⽤于解决⾦融领不同平台之间的消息传递交互问题。顾名思义,AMQP是⼀种协议,更准确的说是⼀种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进⾏限定,⽽是直接定义⽹

络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

2.RabbitMQ 的安装

下载地址:https://www.rabbitmq.com/download.html

注意:处理要下载rabbitmq的按转包外还要下载ellong环境

3.安装步骤

先安装安装Erlang,再安装rabbitmq.exe,直接下一步,傻瓜式安装,安装完成点击开始会出现这里会出现启动、停止、重新安装等

1.点击

2.输入命令:

rabbitmq-plugins enable rabbitmq_management

3.打开浏览器控制台

http://localhost:15672/

默认账号guest guest

如果不能访问,

文件夹为隐藏 需要再文件夹选项中把隐藏文件夹打开显示

C:\Users\XYH\AppData\Roaming\RabbitMQ\db里面的数据删除再次安装一下Rabbitmq.exe

然后执行

rabbitmq-plugins enable rabbitmq_management

就可以访问到了

五、web管理界⾯介绍

1.简介

connections:⽆论⽣产者还是消费者,都需要与RabbitMQ建⽴连接后才可以完成消息的⽣产和消费,在这⾥可以查看连接情况

channels:通道,建⽴连接后,会形成通道,消息的投递获取依赖通道。

Exchanges:交换机,⽤来实现消息的路由

Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

admin:用户

2.Admin⽤户和虚拟主机管理

a.添加用户

上⾯的Tags选项,其实是指定⽤户的⻆⾊,可选的有以下⼏个:

==》超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对⽤户,策略(policy)进⾏操作。

==》监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存

使⽤情况,磁盘使⽤情况等)

==》策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进⾏管理。但⽆法查看节点的相关信息(上图红框标识的部分)。

==》普通管理者(management):仅可登陆管理控制台,⽆法看到节点信息,也⽆法对策略进⾏理。

==》其他:⽆法登陆管理控制台,通常就是普通的⽣产者和消费者。

b.创建虚拟主机

虚拟主机:为了让各个⽤户可以互不⼲扰的⼯作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是⼀个独⽴的访问路径,不同⽤户使⽤不同路径,各⾃有⾃⼰的队列、交换机,互相不会影响。

c、绑定虚拟主机

六、RabbitMQ 的第⼀个程序

a.AMQP协议

b、RabbitMQ⽀持的消息模型

c.引入依赖

<dependency>

<groupId>com.rabbitmq</groupId>

<artifactId>amqp-client</artifactId>

<version>5.7.2</version>

</dependency>

# 第⼀种模型(直连)

在上图的模型中,有以下概念:

P:⽣产者,也就是要发送消息的程序

C:消费者:消息的接受者,会⼀直等待消息到来。

queue:消息队列,图中红⾊部分。类似⼀个邮箱,可以缓存消息;⽣产者向其中投递消息,消费者从其中取出消息。

开发⽣产者

//创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

//设置端ip号

connectionFactory.setHost("127.0.0.1");

//设置端口号

connectionFactory.setPort(5672);

//设置用户名和密码

connectionFactory.setUsername("xyh");

connectionFactory.setPassword("123");

//访问的虚拟通道

connectionFactory.setVirtualHost("/vhost");

//连接

Connection connection = connectionFactory.newConnection();

//创建通道

Channel channel = connection.createChannel();

//通道绑定对应的消息队列

//参数1: 是否持久化 参数2:是否独占队列 参

//数3:是否⾃动删除 参数4:其他属性

channel.queueDeclare("hello",false,false,false,null);

//发布消息

channel.basicPublish("","hello",null,"hello".getBytes());

channel.close();

connection.close();

消费者

//创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

//设置端ip号

connectionFactory.setHost("127.0.0.1");

//设置端口号

connectionFactory.setPort(5672);

//设置用户名和密码

connectionFactory.setUsername("xyh");

connectionFactory.setPassword("123");

//访问的虚拟通道

connectionFactory.setVirtualHost("/vhost");

//连接

Connection connection = connectionFactory.newConnection();

//创建通道

Channel channel = connection.createChannel();

channel.queueDeclare("hello",false,false,false,null);

//消费消息

channel.basicConsume("hello",true,new DefaultConsumer(channel){

    @Override

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

        System.out.println(new String(body));

    }

});

七、springboot实现简单的rabbitmq

1.导入依赖

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

2.配置rabbitmq的配置信息

spring.application.name=rabbitmq-springboot

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.port=5672

spring.rabbitmq.username=dt87

spring.rabbitmq.password=123

spring.rabbitmq.virtual-host=/dt87

3.消息生产者

@Autowired

private RabbitTemplate rabbitTemplate;

@Test

public void test(){

    //直接获取rabbitmq的模板对象

    rabbitTemplate.convertAndSend("hello","hello world");

}

4、消息的消费者

@Component

@RabbitListener(queuesToDeclare = @Queue("hello"))

public class Consumer {

    //消费消息的方法

    @RabbitHandler

    public void test(String message){

        System.out.println(message);

    }

}

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352