RabbitMQ学习(五)消费端削峰限流

1.MQ的作用

1)解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
2)冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
3)削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
4)可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
5)顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
6)缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
7)异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处。

本文只讨论削峰填谷的应用场景:

举个业务场景的栗子,秒杀业务:
上游发起下单操作
下游完成秒杀业务逻辑(库存检查,库存冻结,余额检查,余额冻结,订单生成,余额扣减,库存扣减,生成流水,余额解冻,库存解冻)
上游下单业务简单,每秒发起了10000个请求,下游秒杀业务复杂,每秒只能处理2000个请求,很有可能上游不限速的下单,导致下游系统被压垮,引发雪崩。
为了避免雪崩,常见的优化方案有两种:
1)业务上游队列缓冲,限速发送
2)业务下游队列缓冲,限速执行

本文只讨论下游队列,就是消费端的限速执行

rabbitmq提供了一种服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。
使用 basicqos方法:
在消费端进行使用。 0 1 false
prefetSize:0
prefetCount:这个值一般在设置为非自动ack的情况下生效,一般大小为1
global: true是channel级别, false是消费者级别
注意:我们要使用非自动ack
消费者代码:

package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.197.244");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //1 限流方式  第一件事就是 autoAck设置为 false
        
        channel.basicQos(0,3,false);
        channel.basicConsume(queueName,false,new MyConsumer(channel));
    }
}

自定义消费者代码:

package com.bfxy.rabbitmq.api.limit;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {


    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        
        channel.basicAck(envelope.getDeliveryTag(), false);
        
    }


}

生产者代码:

package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.197.244");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");


        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        
        String msg = "Hello RabbitMQ QOS Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        
    }
}

调试步骤:
1)启动消费者类,效果如图:

消费者启动mq交换机信息.JPG

消费者启动mq队列信息.JPG

2)在自定义消费者类中注释掉channel.basicAck(envelope.getDeliveryTag(), false);
启动生产者类,mq管控台信息
管控台信息.JPG

可以看到1个待确认的,4个准备好的消息,
3)放开代码channel.basicAck(envelope.getDeliveryTag(), false);
启动生产者类,mq管控台信息
管控台信息.JPG

总结:消费者消费成功一个消息后,需要设置成手动确认,当返回确认成功后,再去消费下一个消息,这样可以实现消费端的削峰限流,不至于让消费端服务崩溃。
到这里是不是以为结束了呢,其实还有一个知识点,就是消费端对没有消费成功的消息,可以不进行确认,让其重回队列,再次消费,与上面的代码相比,只需修改自定义的消费者,设置如果满足我们自己设置的条件就认为是没有消费成功,让其重回队列,这个时候broker端会再此发出这条消息。
修改如下:
重回队列.JPG

启动生产者和消费者,消费者控制台信息如下:
重复消费未确认的消息.JPG

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,980评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,422评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,130评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,553评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,408评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,326评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,720评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,373评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,678评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,722评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,486评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,335评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,738评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,283评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,692评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,893评论 2 335

推荐阅读更多精彩内容

  • 概念:微服务就是一些可独立运行、可协同工作的小的服务。微服务是现在特别流行的服务,微服务的字面意思是大家都很好理解...
    程序员技术圈阅读 3,328评论 2 47
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,511评论 18 139
  • IM系统的MQ消息中间件选型:Kafka还是RabbitMQ? 1、前言 在IM这种讲究高并发、高消息吞吐的互联网...
    匆匆岁月阅读 3,147评论 1 111
  • 秋水 红叶与天边 ☆田秀 一声声雁叫转上红叶放飞的长空 (我正枕靠水边的梦乡) 拉开黄昏...
    兴安居士阅读 253评论 0 3
  • 转折点 当每个人跟随神的心意时,往往会面临一个信仰的转折点,有的人,虽有感动,却没有跟随下去。中途放弃了,这样就错...
    玉初辰阅读 811评论 0 51