1.MQ的作用
1)解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
2)冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
3)削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
4)可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
5)顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
6)缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
7)异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处。
本文只讨论削峰填谷的应用场景:
举个业务场景的栗子,秒杀业务:
上游发起下单操作
下游完成秒杀业务逻辑(库存检查,库存冻结,余额检查,余额冻结,订单生成,余额扣减,库存扣减,生成流水,余额解冻,库存解冻)
上游下单业务简单,每秒发起了10000个请求,下游秒杀业务复杂,每秒只能处理2000个请求,很有可能上游不限速的下单,导致下游系统被压垮,引发雪崩。
为了避免雪崩,常见的优化方案有两种:
1)业务上游队列缓冲,限速发送
2)业务下游队列缓冲,限速执行
本文只讨论下游队列,就是消费端的限速执行
rabbitmq提供了一种服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。
使用 basicqos方法:
在消费端进行使用。 0 1 false
prefetSize:0
prefetCount:这个值一般在设置为非自动ack的情况下生效,一般大小为1
global: true是channel级别, false是消费者级别
注意:我们要使用非自动ack
消费者代码:
package com.bfxy.rabbitmq.api.limit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.136.197.244");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//1 限流方式 第一件事就是 autoAck设置为 false
channel.basicQos(0,3,false);
channel.basicConsume(queueName,false,new MyConsumer(channel));
}
}
自定义消费者代码:
package com.bfxy.rabbitmq.api.limit;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
生产者代码:
package com.bfxy.rabbitmq.api.limit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.136.197.244");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
调试步骤:
1)启动消费者类,效果如图:
2)在自定义消费者类中注释掉channel.basicAck(envelope.getDeliveryTag(), false);
启动生产者类,mq管控台信息
可以看到1个待确认的,4个准备好的消息,
3)放开代码channel.basicAck(envelope.getDeliveryTag(), false);
启动生产者类,mq管控台信息
总结:消费者消费成功一个消息后,需要设置成手动确认,当返回确认成功后,再去消费下一个消息,这样可以实现消费端的削峰限流,不至于让消费端服务崩溃。
到这里是不是以为结束了呢,其实还有一个知识点,就是消费端对没有消费成功的消息,可以不进行确认,让其重回队列,再次消费,与上面的代码相比,只需修改自定义的消费者,设置如果满足我们自己设置的条件就认为是没有消费成功,让其重回队列,这个时候broker端会再此发出这条消息。
修改如下:
启动生产者和消费者,消费者控制台信息如下: