背景介绍:
笔者最近研究了下rabbitmq,便很好奇它是怎么保证不丢失消息的呢?于是便整理了这篇文章来跟大家分享下,自己的理解,如有不准确的地方或者不同的意见,还请各位能够给出反馈,我们可以讨论,相互学习,相互成长。
基础知识:
在开始探讨这个问题之前,笔者还是觉得很有必要将rabbitmq的架构等基础知识回顾下,如下所示:
对于使用rabbitmq的服务来说,主要由三部分构成,它们分别是:生产者,rabbitmq,消费者。这三者之间是通过网络来进行通讯的,其中与生产者对应的是exchange,与消费者对应的是connection,而rabbitmq内部又由exchange,queue,connection三部分构成。
消息的流程:消息是由生产者生产了之后,上报给exchange,exchange绑定并存储到queue中,再传递给最终的消费者手里。
如此以来,整个过程就分成了三大场景:
场景1: 生产者与exchange的上报消息,如何保证不丢失?
对于网络通讯来说,解决丢数据最好的办法就是,消息确认机制,而rabbitmq里面是通过两个方式来保证:一种是事务机制,这个是在amqp协议层面保证的,具体操作如下所示:
RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。(备注:采用事务机制实现会降低RabbitMQ的消息吞吐量。)
步骤为:
1.client----发送----->Tx.Select
2.broker----发送----->Tx.Select-Ok(之后publish)
3.client------发送----->Tx.Commit
4.broker------发送---->Tx.Commit-Ok
一种是confrim机制:
原理:消息响应机制,
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号。
confrim的优势是,它是异步的,在生产者发送完一个消息之后,不必要等这个消息的返回,就可以继续处理另外一个消息,等待消息的ack返回之后,再去处理前面发过的消息,类似于多路复用的做法。rabbitmq在收到之后,会回复ack,如果因为rabbitmq自身的问题导致的,会回复nack消息。
对于生产者来说,为了方便确认消息有没有真正到达rabbitmq端,还需要在生产者端设置超时重发,毕竟网络里面是可能丢失消息的。
confrim方式使用的API:
https://godoc.org/github.com/streadway/amqp#Channel.Confirm
场景2: 消费者从queue中获取消息如何保证不丢失?
........