参考
消息队列的使用场景是怎样的?
新手也能看懂,消息队列其实很简单
消息队列设计精要
一、例一
假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:
- 校验用户名等信息,如果没问题会在数据库中添加一个用户记录
- 如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信
- 分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他
- 发送给用户一个包含操作指南的系统通知
- 等等……
但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。
或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。
所以在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。
二、例二
小红是小明的姐姐。小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走。久而久之,两人都觉得麻烦。后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看。
书架就是一个消息队列,小红是生产者,小明是消费者。这带来的好处有:
- 1.小红想给小明书的时候,不必问小明什么时候有空,亲手把书交给他了,小红只把书放到书架上就行了。这样小红小明的时间都更自由。
- 2.小红相信小明的读书自觉和读书能力,不必亲眼观察小明的读书过程,小红只要做一个放书的动作,很节省时间。
- 3.当明天有另一个爱读书的小伙伴小强加入,小红仍旧只需要把书放到书架上,小明和小强从书架上取书即可(唔,姑且设定成多个人取一本书可以每人取走一本吧,可能是拷贝电子书或复印,暂不考虑版权问题)。
- 4.书架上的书放在那里,小明阅读速度快就早点看完,阅读速度慢就晚点看完,没关系,比起小红把书递给小明并监督小明读完的方式,小明的压力会小一些。
这就是消息队列的四大好处:
1.解耦
每个成员不必受其他成员影响,可以更独立自主,只通过一个简单的容器来联系。小红甚至可以不知道从书架上取书的是谁,小明也可以不知道往书架上放书的人是谁,在他们眼里,都只有书架,没有对方。毫无疑问,与一个简单的容器打交道,比与复杂的人打交道容易一万倍,小红小明可以自由自在地追求各自的人生。
2.提速
小红选择相信「把书放到书架上,别的我不问」,为自己节省了大量时间。小红很忙,只能抽出五分钟时间,但这时间足够把书放到书架上了。
3.广播
小红只需要劳动一次,就可以让多个小伙伴有书可读,这大大地节省了她的时间,也让新的小伙伴的加入成本很低。
4.削峰
假设小明读书很慢,如果采用小红每给一本书都监督小明读完的方式,小明有压力,小红也不耐烦。反正小红给书的频率也不稳定,如果今明两天连给了五本,之后隔三个月才又给一本,那小明只要在三个月内从书架上陆续取走五本书读完就行了,压力就不那么大了。
当然,使用消息队列也有其成本:
1.引入复杂度
毫无疑问,「书架」这东西是多出来的,需要地方放它,还需要防盗。
2.暂时的不一致性
假如妈妈问小红「小明最近读了什么书」,在以前的方式里,小红因为亲眼监督小明读完书了,可以底气十足地告诉妈妈,但新的方式里,小红回答妈妈之后会心想「小明应该会很快看完吧……」这中间存在着一段「妈妈认为小明看了某书,而小明其实还没看」的时期,当然,小明最终的阅读状态与妈妈的认知会是一致的,这就是所谓的「最终一致性」。
那么,该使用消息队列的情况需要满足什么条件呢?
1.生产者不需要从消费者处获得反馈
引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走——即所谓异步——成为了可能。小红放完书之后小明到底看了没有,小红根本不问,她默认他是看了,否则就只能用原来的方法监督到看完了。
2.容许短暂的不一致性
妈妈可能会发现「有时候据说小明看了某书,但事实上他还没看」,只要妈妈满意于「反正他最后看了就行」,异步处理就没问题。如果妈妈对这情况不能容忍,对小红大发雷霆,小红也就不敢用书架方式了。
3.确实是用了有效果
即解耦、提速、广播、削峰这些方面的收益,超过放置书架、监控书架这些成本。否则如果是盲目照搬,「听说老赵家买了书架,咱们家也买一个」,买回来却没什么用,只是让步骤变多了,还不如直接把书递给对方呢,那就不对了。
三、例三 何时需要消息队列
当你需要使用消息队列时,首先需要考虑它的必要性。可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。
1.解耦
解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。
比如在美团旅游,我们有一个产品中心,产品中心上游对接的是主站、移动后台、旅游供应链等各个数据源;下游对接的是筛选系统、API系统等展示系统。当上游的数据发生变更的时候,如果不使用消息系统,势必要调用我们的接口来更新数据,就特别依赖产品中心接口的稳定性和处理能力。但其实,作为旅游的产品中心,也许只有对于旅游自建供应链,产品中心更新成功才是他们关心的事情。而对于团购等外部系统,产品中心更新成功也好、失败也罢,并不是他们的职责所在。他们只需要保证在信息变更的时候通知到我们就好了。
而我们的下游,可能有更新索引、刷新缓存等一系列需求。对于产品中心来说,这也不是我们的职责所在。说白了,如果他们定时来拉取数据,也能保证数据的更新,只是实时性没有那么强。但使用接口方式去更新他们的数据,显然对于产品中心来说太过于“重量级”了,只需要发布一个产品ID变更的通知,由下游系统来处理,可能更为合理。
再举一个例子,对于我们的订单系统,订单最终支付成功之后可能需要给用户发送短信积分什么的,但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢(比如短信网关速度不好),那么主流程的时间会加长很多,用户肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”,不一定非要等待它处理完成。
2.最终一致性
最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。
业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。
以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。然而,这个过程中存在很多可能的意外:
- A扣钱成功,调用B加钱接口失败。
- A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。
- A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。
可见,想把这件看似简单的事真正做成,真的不那么容易。所有跨VM的一致性问题,从技术的角度讲通用的解决方案是:
- 强一致性,分布式事务,但落地太难且成本太高,后文会具体提到。
- 最终一致性,主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。
- 回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。
- 整个这个模型依然可以基于RPC来做,但可以抽象成一个统一的模型,基于消息队列来做一个“企业总线”。
- 具体来说,本地事务维护业务变化和通知消息,一起落地(失败则一起回滚),然后RPC到达broker,在broker成功落地后,RPC返回成功,本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发,这样就保证了消息可靠落地broker。
- broker往consumer发送消息的过程类似,一直发送消息,直到consumer发送消费成功确认。
- 我们先不理会重复消息的问题,通过两次消息落地加补偿,下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。
最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。另外,所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。好吧,应该说理论上的100%,排除系统严重故障和bug。
像Kafka一类的设计,在设计层面上就有丢消息的可能(比如定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。
3.广播
消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
比如本文开始提到的产品中心发布产品变更的消息,以及景点库很多去重更新的消息,可能“关心”方有很多个,但产品中心和景点库只需要发布变更消息即可,谁关心谁接入。
4.错峰与流控
试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。
这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。
5.总结
总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。
对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。
支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。
当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。
如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。
四、如何设计一个消息队列
我们现在明确了消息队列的使用场景,下一步就是如何设计实现一个消息队列了。
基于消息的系统模型,不一定需要broker(消息队列服务端)。市面上的的Akka(actor模型)、ZeroMQ等,其实都是基于消息的系统设计范式,但是没有broker。
我们之所以要设计一个消息队列,并且配备broker,无外乎要做两件事情:
- 消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。
- 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
- 掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到接收端,就是这么简单。
一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。
利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。
之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。
为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。
在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。
下面我们会以设计消息队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,来具体分析设计实现一个消息队列时的方方面面。
后面参见原文……
五、常见的消息队列对比
参考消息队列的流派之争
这篇文章的标题很难起,网上一翻全是各种MQ的性能比较,很容易让人以为我也是这么“粗俗”的人(o(╯□╰)o)。我这篇文章想要表达的是——它们根本不是一个东西,有毛的性能好比较?
1.MQ是什么
Message Queue(MQ),消息队列中间件。很多人都说:MQ通过将消息的发送和接收分离来实现应用程序的异步和解偶,这个给人的直觉是——MQ是异步的,用来解耦的,但是这个只是MQ的效果而不是目的。MQ真正的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模块之间通讯要么是HTTP,要么是自己开发的TCP,但是这两种协议其实都是原始的协议。HTTP协议很难实现两端通讯——模块A可以调用B,B也可以主动调用A,如果要做到这个两端都要背上WebServer,而且还不支持长连接(HTTP 2.0的库根本找不到)。TCP就更加原始了,粘包、心跳、私有的协议,想一想头皮就发麻。MQ所要做的就是在这些协议之上构建一个简单的“协议”——生产者/消费者模型。MQ带给我的“协议”不是具体的通讯协议,而是更高层次通讯模型。它定义了两个对象——发送数据的叫生产者;消费数据的叫消费者, 提供一个SDK让我们可以定义自己的生产者和消费者实现消息通讯而无视底层通讯协议。
2.MQ的流派
列出功能表来比较MQ差异或者来一场“MQ性能大比武”的做法都是比较扯的,首先要做的事情应该是分类。我理解的MQ分为两个流派
(1).有broker
这个流派通常有一台服务器作为Broker,所有的消息都通过它中转。生产者把消息发送给它就结束自己的任务了,Broker则把消息主动推送给消费者(或者消费者主动轮询)。
- 重Topic流
kafka、JMS就属于这个流派,生产者会发送key和数据到Broker,由Broker比较key之后决定给那个消费者。这种模式是我们最常见的模式,是我们对MQ最多的印象。在这种模式下一个topic往往是一个比较大的概念,甚至一个系统中就可能只有一个topic,topic某种意义上就是queue,生产者发送key相当于说:“hi,把数据放到key的队列中”。
如上图所示,Broker定义了三个队列,key1,key2,key3,生产者发送数据的时候会发送key1和data,Broker在推送数据的时候则推送data(也可能把key带上)。
虽然架构一样但是kafka的性能要比jms的性能不知道高到多少倍,所以基本这种类型的MQ只有kafka一种备选方案。如果你需要一条暴力的数据流(在乎性能而非灵活性)那么kafka是最好的选择。
- 轻Topic流
这种的代表是RabbitMQ(或者说是AMQP)。生产者发送key和数据,消费者定义订阅的队列,Broker收到数据之后会通过一定的逻辑计算出key对应的队列,然后把数据交给队列。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
注意到了吗?这种模式下解耦了key和queue,在这种架构中queue是非常轻量级的(在RabbitMQ中它的上限取决于你的内存),消费者关心的只是自己的queue;生产者不必关心数据最终给谁只要指定key就行了,中间的那层映射在AMQP中叫exchange(交换机)。AMQP中有四种种exchange——Direct exchange:key就等于queue;Fanout exchange:无视key,给所有的queue都来一份;Topic exchange:key可以用“宽字符”模糊匹配queue;最后一个厉害了Headers exchange:无视key,通过查看消息的头部元数据来决定发给那个queue(AMQP头部元数据非常丰富而且可以自定义)。
这种结构的架构给通讯带来了很大的灵活性,我们能想到的通讯方式都可以用这四种exchange表达出来。如果你需要一个企业数据总线(在乎灵活性)那么RabbitMQ绝对的值得一用。
(2).无broker
此门派是AMQP的“叛徒”,某位道友嫌弃AMQP太“重”(那是他没看到用Erlang实现的时候是多么的行云流水) 所以设计了zeromq。这位道友非常睿智,他非常敏锐的意识到——MQ是更高级的Socket,它是解决通讯问题的。所以ZeroMQ被设计成了一个“库”而不是一个中间件,这种实现也可以达到——没有broker的目的。
节点之间通讯的消息都是发送到彼此的队列中,每个节点都既是生产者又是消费者。ZeroMQ做的事情就是封装出一套类似于scoket的API可以完成发送数据,读取数据。如果你仔细想一下其实ZeroMQ是这样的
顿悟了吗?Actor模型,ZeroMQ其实就是一个跨语言的、重量级的Actor模型邮箱库。你可以把自己的程序想象成一个actor,zeromq就是提供邮箱功能的库;zeromq可以实现同一台机器的IPC通讯也可以实现不同机器的TCP、UDP通讯。如果你需要一个强大的、灵活、野蛮的通讯能力,别犹豫zeromq。
(3).MQ只能异步吗
答案是否定了,首先ZeroMQ支持请求->应答模式;其次RabbitMQ提供了RPC是地地道道的同步通讯,只有JMS、kafka这种架构才只能做异步。我们很多人第一次接触MQ都是JMS之类的这种所以才会产生这种错觉。
(4).总结
kafka,zeromq,rabbitmq代表了三种完全不同风格的MQ架构;关注点完全不同:
- kafka在乎的是性能,速度
- rabbitmq追求的是灵活
- zeromq追求的是轻量级、分布式
如果你拿zeromq来做大数据量的传输功能,不是生产者的内存“爆掉”就是消费者被“压死”;如果你用kafka做通讯总线那绝对的不会快只能更慢;你想要rabbitmq实现分布式,那真的是难为它。
3.参考消息队列选型首选Kafka
在这种方案中,引入了基于生产消费模型的消息队列后,天然具备了负载均衡能力。同时,队列中的消息是由服务实例自行拉取的,所以不论是接入层,还是消息队列,都不要关心服务实例的部署细节。服务实例也可以处行控制拉取消息的频率,实现并发控制。
但由于引入消息队列后,每一次的请求都不会被直接响应,这对于异步请求没有问题,但对于同步请求,处理起来就有些麻烦了。还好,在一些成熟的消息队列方案中,会提供request-reply模式,用于处理同步请求。这也是后我们进行技术选型的重要考虑因素。
举个例子:假设,A模块通过MQ发布消息给B模块和C模块,B模块与C模块收到消息后,需要通过MQ向A模块发送确认消息。这时MQ需要能够自动的将B和C的应答消息转发给A,而不是其它模块。这个也是实现前面提到的request-reply模式的关健所在。
4.参考爬虫架构 | 消息队列应用场景及ActiveMQ、RabbitMQ、RocketMQ、Kafka对比
- 生产者消费者模式(Producer-Consumer)
ActiveMQ-支持,RabbitMQ-支持,RocketMQ-支持,Kafka-支持。 - 发布订阅模式(Publish-Subscribe)
ActiveMQ-支持,RabbitMQ-支持,RocketMQ-支持,Kafka-支持。 - 请求回应模型(Request-Reply)
ActiveMQ-支持,RabbitMQ-支持,RocketMQ-不支持,Kafka-不支持。 - API完备性
ActiveMQ-高,RabbitMQ-高,RocketMQ-高,Kafka-高。 - 多语言支持
ActiveMQ-支持,RabbitMQ-支持,RocketMQ-只支持JAVA,Kafka-支持。 - 单机吞吐量
ActiveMQ-万级,RabbitMQ-万级,RocketMQ-万级,Kafka-十万级。 - 消息延迟
ActiveMQ-无,RabbitMQ-微秒级,RocketMQ-毫秒级,Kafka-毫秒级。 - 可用性
ActiveMQ-高(主从),RabbitMQ-高(主从),RocketMQ-非常高(分布式),Kafka-非常高(分布式)。 - 消息丢失
ActiveMQ-低,RabbitMQ-低,RocketMQ-理论上不会丢失,Kafka-理论上不会丢失。 - 文档的完备性
ActiveMQ-高,RabbitMQ-高,RocketMQ-高,Kafka-高。 - 提供快速入门
ActiveMQ-有,RabbitMQ-有,RocketMQ-有,Kafka-有。 - 社区活跃度
ActiveMQ-高,RabbitMQ-高,RocketMQ-中,Kafka-高。 - 商业支持
ActiveMQ-无,RabbitMQ-无,RocketMQ-阿里云,Kafka-阿里云。
六、golang mq
1.rabbitmq的streadway/amqp
在https://www.rabbitmq.com/getstarted.html
可以看到go语言版本指向了"github.com/streadway/amqp"
Go语言实现RabbitMQ入门系列的目录,有兴趣的可以参考一下:
原文地址提供了其他语言的实现版本,可参考RabbtMQ GetStarted.
2.golang nsq
NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。
参考NSQ深入与实践
1.1 Features
1). Distributed
NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性。
2). Scalable易于扩展
NSQ支持水平扩展,没有中心化的brokers。内置的发现服务简化了在集群中增加节点。同时支持pub-sub和load-balanced 的消息分发。
3). Ops Friendly
NSQ非常容易配置和部署,生来就绑定了一个管理界面。二进制包没有运行时依赖。官方有Docker image。
4).Integrated高度集成
官方的 Go 和 Python库都有提供。而且为大多数语言提供了库。