RabbitMQ 消息确认机制

之前的文章我们已经介绍了 RabbitMQ 的基本使用,但是在默认情况下 RabbitMQ 并不能保证消息是否发送成功、以及是否被成功消费掉。消息在传递过程中存在丢失的可能。基于这样的现状,就有了消息的确认机制,来提高消息传递过程中的可靠性。

RabbitMQ 中,消息的确认机制包含以下两个方面:

  • 消息发送确认,生产者发送消息的确认包含两部分:
    1、生产者发送的消息是否成功到达交换机
    2、消息是否成功的从交换机投放到目标队列
  • 消息接收确认,消费者接收消息有三种不同的确认模式:
    1、AcknowledgeMode.NONE:不确认,这是默认的模式,默认所有消息都被成功消费了,直接从队列删除消息。存在消息被消费过程中由于异常未被成功消费而掉丢失的风险。
    2、AcknowledgeMode.AUTO:自动确认,根据消息被消费过程中是否发生异常来发送确认收到消息拒绝消息的指令到 RabbitMQ 服务。这个确认时机开发人员是不可控的,同样存在消息丢失的风险。
    3、AcknowledgeMode.MANUAL:手动确认,开发人员可以根据实际的业务,在合适的时机手动发送确认收到消息拒绝消息指令到 RabbitMQ 服务,整个过程开发人是可控的。这种模式也是我们要重点介绍的。

一、准备环境

创建 SpringBoot 项目,添加 RabbitMQ 依赖。

这里将生产者和消费者放在一个项目。

application.properties中添加连接 RabbitMQ 服务的配置,以及开启消息确认机制需要的配置:

server.port=8080
# rabbitmq 相关配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
# 开启消息是否已经发送到交换机的确认机制
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消息未成功投递到目标队列时将消息返回
spring.rabbitmq.publisher-returns=true
# 设置消费者需要手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

创建交换机、队列,并完成绑定:

@Configuration
public class AckRabbitMQConfig {
    // Fanout交换机
    @Bean
    FanoutExchange ackExchange() {
        return new FanoutExchange("ack.exchange", true, false);
    }

    // 消息队列
    @Bean
    Queue ackQueue() {
        return new Queue("ack.queue", true);
    }

    // 绑定队列和交换机
    @Bean
    Binding ackBinding() {
        return BindingBuilder.bind(ackQueue()).to(ackExchange());
    }
}

二、消息发送确认

消息发送确认的第一部分,是确认消息是否已经成功发送到交换机,我们需要实现RabbitTemplate.ConfirmCallback接口:

@Service
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    /**
     * @param correlationData
     * @param ack true 表示消息成功发送到交换机,false 则发送失败
     * @param cause 消息发送失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息已经发送到交换机!");
        } else {
            System.out.println("消息发送到交换机失败:" + cause);
        }
    }
}

消息无论是否成功到达交换机都会调用confirm方法。

消息发送确认的第二部分,就是消息是否成功的从交换机投放到目标队列,需要实现RabbitTemplate.ReturnsCallback接口:

@Service
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("未成功投递到队列的消息:"+ returned.toString());
    }
}

returnedMessage方法只会在消息未成功投递到目标队列时被调用ReturnedMessage就是投递失败的消息基本信息。

定义好了两种消息发送确认服务,接下来就是配置消息发送确认服务,可以放在 RabbitMQ 配置类里进行全局配置:

@Configuration
public class AckRabbitMQConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    ConfirmCallbackService confirmCallbackService;

    @Autowired
    ReturnCallbackService returnCallbackService;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        rabbitTemplate.setReturnsCallback(returnCallbackService);
    }
    ......
    ......
}

也可以在发送消息时单独配置:

@Service
public class SendMessageService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    ConfirmCallbackService confirmCallbackService;

    @Autowired
    ReturnCallbackService returnCallbackService;

    public void send(String message) {
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        rabbitTemplate.setReturnsCallback(returnCallbackService);
        rabbitTemplate.convertAndSend("ack.exchange", "", message);
        System.out.println("生产者发送的消息:" + message);
    }
}

三、消息接收确认

消息接收确认的实现就相对简单一些:

@Service
public class ReceiveMessageService {
    @RabbitListener(queues = "ack.queue")
    public void receive(String msg, Channel channel, Message message) {
        try {
            // int i = 1/0;
            // 确认收到消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("消费者确认收到消息:" + msg);
        } catch (Exception e) {
            try {
                // 拒绝消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                System.out.println("消费者拒绝消息:" + msg);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }
}

使用消息接收的手动确认模式时,接收消息的方法需要额外添加ChannelMessage两个类型的参数。

Channel就是信道,在学习 Java client 操作 RabbitMQ 时,就是用它来发送接收消息的,不了解的可以复习一下。Message是 RabbitMQ 封装的消息类,里边包含了消息体、消息序号、以及交换机、队列等一些相关的信息。

这样我们就可以根据实际的业务需求,在适当的时机告诉 RabbitMQ 服务,消息已经成功消费,或者被拒绝消费。

这就涉及如下几个方法了:

  • basicAck,确认收到消息,即消息消费成功,执行该方法后,消息会被从队列删除。该方法的参数含义如下:
    1、deliveryTag:消息投递的序号,就是1、2、3、4这样的递增整数。
    2、multiple:是否批量确认消息,false 表示只确认当前 deliveryTag 对应的消息,true 表示会确认小于当前 deliveryTag 但还未被确认的消息。
  • basicNack,拒绝消息,由于发生异常等原因,消息没有被成功消费。和 basicAck 方法相比多了一个参数:
    1、requeue:true 表示被拒绝的消息会重新进入队列头部。
  • basicReject,和 basicNack 方法的作用类似,但是少了 multiple 参数。

这里有两个问题需要注意:

1、

如果拒绝消息时,设置requeuetrue,由于消息会重新进入队列头部,接下来又会被消费者处理,这样很可能陷入死循环,耗尽服务器资源,很危险的。所以在设置requeuetrue时,需要慎重考虑。

拒绝消息时一般都是由于发生异常、或者业务上的错误,导致消费流程不能正常进行下去,可以考虑将此时的消息发送到死信队列,后续再单独处理。具体怎么实现,后期会有专门的文章介绍,目前先了解即可。

2、

如果开启了消息接收的手动确认模式,但是消费消息时却没有做任何消息确认成功或拒绝的应答操作,则对应的消息会变成Unacked状态:

如果消费者客户端不重启,则Unacked状态的消息会一直堆积,不会被删除,也不会被重新消费。

如果消费者客户端重启,则消息会自动变为Ready状态,这样又会被重新消费一次。

三、效果测试

可以通过如下接口来发送消息:

@RestController
public class SendMessageController {
    @Autowired
    private SendMessageService sendMessageService;

    @GetMapping("/send/{msg}")
    public void send(@PathVariable("msg") String msg) {
        sendMessageService.send(msg);
    }
}

要测试消息不能成功发送到交换机的情况,只需要发送消息时指定一个不存在的交换机即可。

由于RabbitTemplate.ReturnsCallbackreturnedMessage方法只会在消息未成功投递到目标队列时被调用,所以要测试消息是否成功的从交换机投放到目标队列,可以注释掉AckRabbitMQConfig中交换机和队列绑定的代码,或者在后台进行交换机和队列的解绑:

这样消息自然不能成功的从交换机投放到队列。

至于消息接收确认,可以自行模拟不同的业务场景测试。

本文完!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,290评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,107评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,872评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,415评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,453评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,784评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,927评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,691评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,137评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,472评论 2 326
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,622评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,289评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,887评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,741评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,316评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,490评论 2 348

推荐阅读更多精彩内容