消息队列之RabbitMQ-消息确认机制

RabbitMQ的消息确认有两种:

  • 对生产者发送消息的确认:发送消息时,由于网络等原因可能导致消息发送失败。所以,RabbitMQ必须有机制确保消息能准确到达mq,如果不能到达,必须反馈给生产端进行重发。

  • 对消费者消费消息的确认:消息的消费过程中,也可能发生很多异常,比如消息格式不合法、处理超时等,RabbitMQ需要提供消费回执,消费成功的消息可以删除,消费失败的消息则继续存储,择机再次投递。

1、投递确认

RabbitMQ对生产者发送消息的确认有两种实现方式:

  • 通过AMQP提供的事务机制实现
  • 使用发送者confirm模式实现

1.1 confirm

生产者投递消息后,如果 Broker 收到消息,则会给生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障。

1.png

一旦 channel 处于 confirm 模式,broker 和 client 都将启动消息计数(以 confirm.select 为基础从 1 开始计数)。broker 会在处理完消息后,在当前 channel 上通过发送 basic.ack 的方式对其进行 confirm 。delivery-tag 域的值标识了被 confirm 消息的序列号。broker 也可以通过设置 basic.ack 中的 multiple 域来表明到指定序列号为止的所有消息都已被 broker 正确的处理了。

confirm在代码实现上分为两步:

  • 在 channel 上开启确认模式: channel.confirmSelect()
  • 在 channel 上添加监听: channel.addConfirmListener(ConfirmListener listener), 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。
import com.rabbitmq.client.*;
import java.io.IOException;

public class ReturnListeningProducer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
      
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingKey = "item.update";
        String errRoutingKey = "error.update";

        //指定消息的投递模式:confirm 确认模式
        channel.confirmSelect();

        //发送
        for (int i = 0; i < 3 ; i++) {
            String msg = "this is return——listening msg ";
            //@param mandatory 设置为 true 路由不可达的消息会被监听到,不会被自动删除
            if (i == 0) {
                channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
            } else {
                channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
            }
            System.out.println("Send message : " + msg);
        }

        //添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 返回成功的回调函数
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("succuss ack");
            }
            /**
             * 返回失败的回调函数
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.printf("defeat ack");
            }
        });

        //添加一个 return 监听
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("return body: " + new String(body));
            }
        });

    }
}

以上采用的异步 confirm 模式,除此之外还有单条同步 confirm 模式、批量同步 confirm 模式,但是现实场景中很少使用。

而后面的Return Listener 用于处理些不可路由的消息。在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定的路由 key 路由不到,如果需要监听这种不可达的消息,就要使用 Return Listener。

在基础API中有一个关键的配置项Mandatory:如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,那么 broker 端自动删除该消息。

1.2 事务

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

  • channel.txSelect()声明启动事务模式
  • channel.txComment()提交事务
  • channel.txRollback()回滚事务
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wj.transation.config.MqConfig;
 
public class TxSend {
 
    private static final String QUEUE_NAME = "test_queue_tx";
 
    public static void send() throws Exception {
        Connection connection = MqConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "hello tx!";
        try {
            // 用户将当前channel设置成transaction
            channel.txSelect();
 
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
 
            // 异常,使回滚
            int x=1/0;
 
            System.out.println("【发送端】消息:"+msg);
            //提交事务
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            System.out.println("发生异常,回滚");
        }
        channel.close();
        connection.close();
    }
}

如果RabbitMQ返回ack失败,生产端也无法确认消息是否真的发送成功,也会造成数据丢失。最好的办法是使用RabbitMQ的事务机制,但是RabbitMQ的事务机制效率极低,每秒钟处理的消息仅几百条,不适合并发量大的场景。

为了达到消息的可靠投递,还可以借助外部工具实现,比如redis:

  • 生产端在发送消息之前,生成ack唯一确认的id
  • 以ackId为键,消息为value,保存进redis缓存,设置超时时间
  • redis实现超时触发接口,当key过期时,重发消息并再次执行第2步
  • 生产端实现ConfirmCallback接口
  • ConfirmCallback接口触发时,若ack为true,则直接删除此次ackId对应的msg;若ack为false,则将该ackId对应的msg取出重发

再比如利用队列与定时任务:

不通过设置redis超时时间触发超时事件进行重发,而是取出消息放入一个ackFailList中,然后开启定时任务,扫描ackFailList,重发失败的msg。

如果把List保存在内存中,不具备持久化的功能,并不安全,可以考虑保存到数据库中,防止消息丢失。

2、消费确认

为了保证消息能可靠到达消费端,RabbitMQ也提供了消费端的消息确认机制。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

import com.rabbitmq.client.*;
import java.io.IOException;

public class AckAndNackConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();

        final Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        //一般不用代码绑定,在管理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                if ((Integer) properties.getHeaders().get("num") == 0) {
                    channel.basicNack(envelope.getDeliveryTag(), false, true);
                } else {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        //6. 设置 Channel 消费者绑定队列,设为非自动ack
        channel.basicConsume(queueName, false, consumer);

    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,290评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,107评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,872评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,415评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,453评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,784评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,927评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,691评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,137评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,472评论 2 326
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,622评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,289评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,887评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,741评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,316评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,490评论 2 348