一、为什么要使用消息队列:解耦、异步、削峰
1️⃣解耦
传统模式的缺点:系统间耦合性太强。如图,系统 A 直接调用系统 B 和系统 C,系统 D 接入,系统 A 还得修改代码:2️⃣异步
传统模式的缺点:非必要的业务逻辑以同步的方式运行,太耗费时间。3️⃣削峰
传统模式的缺点:并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常。二、消息队列的弊端
1️⃣系统可用性降低
系统引入的外部依赖越多,越容易挂掉。本来 A 系统调用 BCD 三个系统的接口就好了,ABCD 四个系统好好的,没啥问题。硬加个 MQ,MQ 挂了,整套系统就崩溃了,风险很大。因此,系统可用性降低。
2️⃣系统复杂性增加
要多考虑很多方面的问题,如何处理消息丢失的情况?如何保证消息传递的顺序性?如何保证消息没有重复消费?
3️⃣一致性问题
A 系统处理完了直接返回成功了,人都以为这个请求就成功了;但要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,这数据就不一致了。所以消息队列实际是一种非常复杂的架构,引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,系统复杂度提升了一个数量级,也许是复杂了 10 倍。
三、如何保证消息队列高可用
以 RcoketMQ为例,它的集群就有多 master 模式、多 master 多 slave 异步复制模式、多 master 多 slave 同步双写模式。
多 master 多 slave 模式部署架构图:通信过程如下:
Producer 与 NameServer 集群中的一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。
Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave 建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
那么 kafka 呢,为了对比说明直接上 kafka 的拓补架构图:如图,一个典型的 Kafka 集群中包含若干 Producer(可以是 web 前端产生的 Page View,或者是服务器日志,系统 CPU、Memory 等),若干 broker(Kafka 支持水平扩展,一般 broker 数量越多,集群吞吐率越高),若干 Consumer Group,以及一个 Zookeeper 集群。
Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance。
Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息。
四、如何保证消息不被重复消费?即如何保证消息队列的幂等性?
为什么会造成重复消费?
无论哪种消息队列,造成重复消费原因其实都是类似的。通常,消费者在消费消息完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同。
例如,RabbitMQ 是发送一个 ACK 确认消息,RocketMQ 是返回一个 CONSUME_SUCCESS 成功标志。kafka 实际上有个 offset 的概念。简单说一下,就是每一个消息都有一个 offset,kafka 消费过消息后,需要提交 offset,让消息队列知道自己已经消费过了。
造成重复消费的原因
因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。
如何解决?这个问题针对业务场景来答分以下几点:
1️⃣如果拿到这个消息做数据库的 insert 操作:给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2️⃣如果拿到这个消息做 Redis 的 set 的操作:不用解决。因为无论 set 几次结果都是一样的,set 操作本来就算幂等操作。
3️⃣准备一个第三方介质,来做消费记录。以 Redis 为例,给消息分配一个全局 id,只要消费过该消息,将以 K-V 形式写入 Redis。那消费者开始消费前,先去 Redis 中查询有没消费记录即可。
五、如何保证消息的可靠性传输
这个可靠性传输,每种 MQ 都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据。
1️⃣RabbitMQ
- 生产者丢数据
从生产者弄丢数据这个角度来看,RabbitMQ 提供 transaction 和 confirm 模式来确保生产者不丢消息。transaction 机制就是说,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。
缺点就是吞吐量下降了。因此,生产上用 confirm 模式的居多。一旦 channel 进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始)。一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个 Ack 给生产者(包含消息的唯一 ID)。这就使得生产者知道消息已经正确到达目的队列了。如果 RabiitMQ 没能处理该消息,则会发送一个 Nack 消息给你,你可以进行重试操作。
- 消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和 confirm 机制配合使用,可以在消息持久化磁盘后,再给生产者发送一个 Ack 信号。这样,如果消息持久化磁盘之前,RabbitMQ 阵亡了,那么生产者收不到 Ack 信号,生产者会自动重发。那么如何持久化呢,就下面两步:
- 将 queue 的持久化标识 durable 设置为 true,则代表是一个持久的队列。
- 发送消息的时候将 deliveryMode=2
这样设置以后,RabbitMQ 就算挂了,重启后也能恢复数据。
- 消费者丢数据
消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时 RahbitMQ 会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息。至于解决方案,采用手动确认消息即可。
2️⃣kafka
Producer 在发布消息到某个 Partition 时,先通过 ZooKeeper 找到该 Partition 的 Leader 然后无论该 Topic 的 Replication Factor 为多少(也即该 Partition 有多少个 Replica),Producer 只将该消息发送到该 Partition 的 Leader。Leader 会将该消息写入其本地 Log。每个 Follower 都从 Leader 中 pull 数据。
针对上述情况,得出如下分析
- 生产者丢数据
在kafka生产中,基本都有一个leader和多个follwer。follwer会去同步leader的信息。因此,为了避免生产者丢数据,做如下两点配置:
- 第一个配置要在producer端设置acks=all。这个配置保证了,follwer同步完成后,才认为消息发送成功。
- 在producer端设置retries=MAX,一旦写入失败,这无限重试
- 消息队列丢数据
针对消息队列丢数据的情况,无外乎就是,数据还没同步,leader就挂了,这时zookpeer会将其他的follwer切换为leader,那数据就丢失了。针对这种情况,应该做两个配置:
- replication.factor参数,这个值必须大于1,即要求每个partition必须有至少2个副本
- min.insync.replicas参数,这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系
这两个配置加上上面生产者的配置联合起来用,基本可确保kafka不丢数据
- 消费者丢数据
这种情况一般是自动提交了offset,然后你处理程序过程中挂了。kafka以为你处理好了。再强调一次offset是干嘛的。offset:指的是kafka的topic中的每个消费组消费的下标。简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。解决方案也很简单,改成手动提交即可。
六、如何保证消息的顺序性?
针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka 中就是 partition、RabbitMQ 中就是 queue)。然后只用一个消费者去消费该队列。
如果为了吞吐量,有多个消费者去消费如何
这个问题,没有固定回答的套路。比如关于微博,发微博、写评论、删除微博,这三个异步操作。如果是这样一个业务场景,那只要重试就行。
比如某个消费者先执行了写评论的操作,然而此时微博都还没发,写评论一定是失败的。等另一个消费者,先执行发微博的操作后,再执行,就可以成功。
总之,针对这个问题,保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。