为什么需要消费端的限流?
- 假设一个场景,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这种情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!此时很有可能导致服务器崩溃,严重的可能导致线上的故障。
- 除了这种场景,还有一些其他的场景,比如说单个生产者一分钟生产出了几百条数据,但是单个消费者一分钟可能只能处理60条数据,这个时候生产端和消费端肯定是不平衡的。通常生产端是没办法做限制的。所以消费端肯定需要做一些限流措施,否则如果超出最大负载,可能导致消费端性能下降,服务器卡顿甚至崩溃等一系列严重后果。
消费端限流机制
RabbitMQ提供了一种qos
(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consume或者channel设置Qos的值) 未被确认前,不进行消费新的消息。
需要注意:
1.不能设置自动签收功能(autoAck = false)
2.如果消息没被确认,就不会到达消费端,目的就是给消费端减压
限流相关API
限流设置 - BasicQos()
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:
单条消息的大小限制,消费端通常设置为0,表示不做限制
prefetchCount:
一次最多能处理多少条消息,通常设置为1
global:
是否将上面设置应用于channel,false代表consumer级别
注意事项
prefetchSize
和global
这两项,rabbitmq没有实现,暂且不研究
prefetchCount
在 autoAck=false
的情况下生效,即在自动应答的情况下这个值是不生效的
手工ACK - basicAck()
void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你可以给我下一条了。参数multiple
表示是否批量签收,由于我们是一次处理一条消息,所以设置为false
限流演示
生产端
生产端就是正常的逻辑
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
//发送消息
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
自定义消费者
在这里可以进行消息的手工ACK
public class MyConsumer extends DefaultConsumer {
//接收channel
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
//System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//手工ACK,参数multiple表示不批量签收
//channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消费端
关闭autoACK,进行限流设置
public class Consumer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.157");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
//4 声明交换机和队列,然后进行绑定设置路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//进行参数设置:单条消息的大小限制,一次最多能处理多少条消息,是否将上面设置应用于channel
channel.basicQos(0, 1, false);
//限流: autoAck设置为 false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
运行说明
我们先注释掉手工ACK方法,然后启动消费端和生产端,此时消费端只打印了一条消息
-----------consume message----------
consumerTag: amq.ctag-vtsQsdK17o1Z3BWeGvZKRA
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=test_qos_exchange, routingKey=qos.save)
body: Hello RabbitMQ QOS Message
这是因为我们设置了手工签收,并且设置了一次只处理一条消息,当我们没有回送ack应答时,Broker端就认为消费端还没有处理完这条消息,基于这种限流机制就不会给消费端发送新的消息了,所以消费端只打印了一条消息。
通过管控台也可以看到队列总共收到了5条消息,有一条消息没有ack。
将手工签收代码取消注释,再次运行消费端,此时就会打印5条消息的内容。