RabbitMQ消息中间件从入门到高级(二)

一、消息如何保证 100% 的投递成功?

投递主要针对生产端,什么是生产端的可靠性投递?

  • 保障消息成功的发出去
  • 保证MQ节点成功收到消息
  • 发送端收到MQ的确认应答
  • 完善的消息补偿机制,只做前三步的时候,也许生产端就失败了

BAT/TMD 互联网大厂解决方案,看具体业务和并发量

  • 消息落库,对消息状态进行打标
  • 消息的延迟投递,做二次检查,回调检查

消息落库步骤:

流程的示意图如上所示,比如我下单成功了,这是进行step1,对我的业务数据进行入库,业务数据入库完毕(这里要特别注意一定要保证业务数据入库)再对要发送的消息进行入库,图中采用了两个数据库,可以根据实际业务场景来确定是否采用两个数据库,如果采用了两个数据库,有人可能就像到了采用分布式事务来保证数据的一致性,但是在大型互联网中,基本很少采用事务,都是采用补偿机制。

对业务数据和消息入库完毕就进入setp2,发送消息到MQ服务上,按照正常的流程就是消费者监听到该消息,就根据唯一id修改该消息的状态为已消费,并给一个确认应答ack到Listener。如果出现意外情况,消费者未接收到或者Listener接收确认时发生网络闪断,接收不到,这时候就需要用到我们的分布式定时任务来从msg数据库抓取那些超时了还未被消费的消息,重新发送一遍。重试机制里面要设置重试次数限制,因为一些外部的原因导致一直发送失败的,不能重试太多次,要不然会拖垮整个服务。例如重试三次还是失败的,就把消息的status设置成2,然后通过补偿机制,人工去处理。实际生产中,这种情况还是比较少的,但是你不能没有这个补偿机制,要不然就做不到可靠性了。

数据库库表结构:订单表和消息记录表

-- 表 order 订单结构
CREATE TABLE IF NOT EXISTS `t_order` (
  `id` varchar(128) NOT NULL, -- 订单ID
  `name` varchar(128), -- 订单名称 其他业务熟悉忽略
  `message_id` varchar(128) NOT NULL, -- 消息唯一ID
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 表 broker_message_log 消息记录结构
CREATE TABLE IF NOT EXISTS `broker_message_log` (
  `message_id` varchar(128) NOT NULL, -- 消息唯一ID
  `message` varchar(4000) DEFAULT NULL, -- 消息内容
  `try_count` int(4) DEFAULT '0', -- 重试次数
  `status` varchar(10) DEFAULT '', -- 消息投递状态  0 投递中 1 投递成功   2 投递失败
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',  -- 下一次重试时间 或 超时时间
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 创建时间
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 更新时间
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

延迟投递:

回想第一种方案,生产端既要对业务数据入库,又要对消息数据入库,这种设计在高并发场景下,真的合适吗?这时候需要我们的第二种方案了,流程图如下。

upstream Server就是我们的上游服务,也就是生产者,生产者将业务数据入库成功后,生成两条消息,一条是立即发送出去给到下游服务 downstream Server的,一条是延迟消息给到 补偿服务 callback Server的。

正常情况下,下游服务监听到这个即时的消息,会发送一条消息给到callback Server,注意这里不是采用第一种方案里面的返回ack方式,而是发送了一条消息给回去。

callback Server监听到这个消息,知道了刚才有一条消息消费成功了,然后把这个持久化到数据库中,当上游服务发送的延迟消息到达callback Server时,callback Server就会去数据库查询,刚才下游服务是否有处理过这个对应的消息,如果其msg DB里面有这个记录就说明这条消息是已经被消费了,如果不存在这个记录,那么callback Server就会发起一个RPC请求给到上游服务,告诉上游服务,你刚才这个消息没发送成功,需要重新发送一遍,上游服务就重新发送即时和延迟的两条消息出去,按照之前的流程继续走一遍。

虽然第二种方案也是无法做到100%的可靠传递,在特别极端的情况,还是需要定时任务和补偿机制进行辅助的。但是第二种方案的核心是减少数据库操作,这个点很重要!

在高并发场景下,我考虑的不是百分百的可靠性了,而是考虑可用性,性能能否扛得住这个流量,所以我能减少一次数据库操作就减少一次。我上游服务减少了一次数据库操作,我的服务性能相对而言就提高了一些,而且又能把异步callback Server补偿服务解耦出来。

结论

这两种方案都是可行的,需要根据实际业务来进行选择,大型的超高并发的场景会选择第二种方案,普通的就采用第一种即可。

二、幂等性概念及业界主流解决方案

幂等就是一个操作,不论执行多少次,产生的效果和返回的结果都是一样的。

消费端-幂等性保障

在海里订单产生的业务高峰期,如何避免消息的重复消费问题?

消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到多条一样的消息。

业界主流的幂等性操作:

  • select + insert 机制
  • 利用 Redis 的原子性实现

select + insert机制
并发不高的后台系统,或者一些任务JOB,为了支持幂等,支持重复执行,简单的处理方法是,先查询下一些关键数据,判断是否已经执行过,再进行业务处理,就可以了。注意:核心高并发流程不要用这种方法;

利用 Redis 的原子性实现
Redis的操作之所以是原子性的,是因为Redis是单线程的。
如果存在相同的key会,将旧数据覆盖掉

使用Redis进行幂等,需要考虑的问题?
第一:我们是否要进行数据落地,如果落地的话,关键解决的问题是数据库和Redis缓存如何做到原子性?
第二:如果不进行落地,那么都存储在缓存中,如何设置定时同步策略?

三、Cofirm 确认消息

理解Confirm 消息确认机制:

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
  • 生产者进行接收应答,用来确认这条消息是否正常发送到Broker,这种方式也是消息的可靠性投递的核心保障!

如何实现Confirm 确认消息?

  • 第一步:在channel 上开启确认模式: channele.confirmSelect()
  • 第二步:在channel 上添加监听: addConfirmListener, 监听成功和失败的返回结果,根据具体的结果在发送端对消息进行重新发送、或记录日志等后续处理!

代码实例:
https://blog.csdn.net/ctwy291314/article/details/80534604

四、Return返回消息

  • Return Listener 用于处理一些不可路由的消息!
  • 我们的消息生产者,通过指定一个Exchange 和 Routingkey, 把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
  • 但是在某些情况下,如果我们在发送消息时候,当前的exchange 不存在或者指定的路由key路由不到,这个时候如果我们需要监听这中不可达的消息,就要使用Return Listener!

在基础API中有一个关键项配置:

  • Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续的处理(执行return操作),如果为false,那么broker端自动删除该消息!

代码实例
https://blog.csdn.net/m0_37743948/article/details/82864452

五、消费端的限流策略

  • 什么是消费端限流?
    假设一个场景,首先,我们 rabbitmq 服务端有上万个未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:
    巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多的数据!

  • 如何实现消费端限流?
    RabbitMQ 提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置Qos的值)未被确认(ACK)前,不进行消费新的消息。

  • void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
    prefetchSize:单条消息的大小限制,0代表不限制
    prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
    global:true\false 是否将上面应用于channel,简单点说,就是上面限制是channel级别还是consumer级别

// 1 限流方式 第一件事就是autoAck设置为false
// channel.basicQos(perfetchSize, prefetchCount, global);
channel.basicQos(0, 1, false);// 一条一条处理

channel.basicConsume(queueName, false, new MyConsumer(channel);// 手动签收

六、消费端ACK与重回队列机制

  • 消费端的手工ACK和NACK
    消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!
    如果由于服务宕机等严重问题,那么我们就需要手工进行ACK保障消费端消费成功!

  • 消费端的重回队列
    消费端重回队列是为了对没有处理成功的消息,把消息重新传递给 !
    一般我们在实际应用中,都会关闭重回队列,也就是设置为false

七、TTL消息详解

  • TTL是Time To Live的缩写,也就是生存时间
  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
  • RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

八、死信队列

  • 死信队列:DLX,Dead-Letter-Exchange
    利用DLX,当消息在一个队列中变成死信(dead message,没有任何消费者消费)之后,它能被重新publish到另一个Exchange,这个exchange就是DLX

  • 消息变成死信有以下几种情况

    1. 消息被拒绝(basic.reject/basic.nack)并且requeue=false(不重回队列)
    2. 消息TTL过期
    3. 队列达到最大长度

    DLX也是一个正常的exchange和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。

    当这个队列有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

    可以监听这个队列中的消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前的immediate参数的功能。

  • 死信队列设置:
    首先需要设置死信队列的exchange和queue,然后进行绑定:
    Exchange: dlx.exchange
    Queue: dlx.queue
    RoutingKey: #

    然后我们进行正常的声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");

    这样消息在过期、requeue、队列在到达最大长度时,消息就可以直接路由到死信队列!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,082评论 5 464
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,231评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 145,047评论 0 327
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,977评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,893评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,014评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,976评论 3 388
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,605评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,888评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,906评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,732评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,513评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,980评论 3 301
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,132评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,447评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,027评论 2 343
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,232评论 2 339

推荐阅读更多精彩内容