- 一、架构图对比
- 1.1 Kafka Architecture
- 1.2 RocketMQ Architecture
- 1.3 总结
- 二、高可用对比
- 2.1 Kafka
- 2.2 RocketMQ
- 2.3 总结
- 三、文件存储结构对比
- 3.1 Kafka
- 3.2 RocketMQ
- 3.3 总结
- 四、发送数据可靠性对比
- 五、功能丰富性对比
- 5.1 对比
- 5.2 关于有序消息
- 六、生产者分区分配策略对比
- 6.1 Kafka
- 6.2 RocketMQ
- 6.3 总结
- 七、消费者分区分配策略对比
- 7.1 Kafka
- 7.2 RocketMQ
- 7.3 总结
- 八、RocketMQ中的部分源码分析
- 8.1 异步、同步、单向生产消息
- 8.1.1 同步消息发送流程
- 8.1.2 单向消息发送流程
- 8.1.3 异步消息发送流程
- 8.2 Tag消息过滤
- 8.3 顺序消息的保证
- 8.1 异步、同步、单向生产消息
Kafka:Linkedin研发并捐赠给Apache软件基金会的开源分布式消息系统
RocketMQ:阿里研发并捐赠给Apache软件基金会的开源分布式消息系统
一、架构图对比
1.1 Kafka Architecture
- Zookeeper集群:存储Kafka集群的元数据,Kafka早期版本Consumer Offset也被存储在此
- Broker集群:对外提供消息服务、消息存储的地方
- Producer Client:数据生产者
- Consumer Client:数据消费者
1.2 RocketMQ Architecture
- NameServer集群:存储RocketMQ集群的路由信息,譬如BrokerData、TopicData,角色可类比到Kafka中的Zookeeper集群,但NameServer集群是无中心化集群,所有机器地位相等,且不同之间NameServer无任何数据交互,相比于ZK集群更加轻量。
- Broker集群:对外提供服务、消息存储
- Producer Client:数据生产者
- Consume Client:数据消费者
1.3 总结
大体看来,两款MQ的组成部分相似,有负责消息存储与检索的服务、有集群注册中心、还有业务侧必须用的两个Client。
这里大致说一下NameServer与Zookeeper的对比:
在分布式环境下多台机器肯定需要互相协调以应对各类复杂场景包括:对外提供服务、内部容错、HA。Kafka与RocketMQ也是一样,他们分别引入Zookeeper与NameServer作为自身集群的协调工作者,其实更像是一个元数据存储地与注册中心,它们都存储着集群下机器信息与Topic信息等,为多机之间的协调工作提供数据支持。
Zookeeper是一个比较重量级的中间件,在集群模式下,Zookeeper通过ZAB协议来保证Leader
与follower
之间的数据一致,这里说一个疑惑:网上很多资料会说ZK是强一致的,其实不是,ZAB协议中在数据同步时,采用的也是过半机制,也就是说只要半数follower
向Leader
进行ACK后,Leader
会被认为数据已经同步成功,因此严格来说,ZK集群并不能保证数据强一致。
NameServer相比于ZK是一个非常轻量级的组件,是无中心化的集群模式,集群下各节点是无差异的,节点与节点之间不会相互进行信息通讯,因此NameServer可以看成是无状态,那么自然就不需要引入各类复杂的分布式协议以保证集群的数据一致性与容错性。
二、高可用对比
这里说的高可用主要针对Broker HA进行说明,注意:对于Rocket与Kafka来说,Broker的高可用有着较大的差异,个人觉得是两者对比的重点。
对于MQ产品而言保证消息的安全一定是重中之重的任务,RocketMQ与Kafka都引入了数据冗余策略来保证HA
2.1 Kafka
上图为执行以下命令得到的一段帮助文档:
cd ${KAFKA_HOME}/bin
sh kafka-topics.sh -help
Kafka在创建Topic时,可指定--replication-factor
参数,该参数表示为每个partition
创建多少个副本,而这些副本不能在同台机器上,以保证HA,如下图:
分区follower
会主动向leader
进行数据同步以维护自身消息,当leader
所在机器发生不可用时,只要还有follower
可用则依然可以对外提供服务(这里针对主、副本切换可能导致部署数据丢失情况不作讨论)。
总体来看,Topic下的partition
有几个replication
具体是在创建该Topic时指定的,是以Topic为维度设置的。
2.2 RocketMQ
RocketMQ中每个Topic MessageQueue(可以类比到Kafka中的Partition角色)
可以有几个副本并不是在创建Topic时指定,而是由Broker集群创建之初就决定了,下图列举了1m-1s的集群结构。
在${ROCKET_HOME}/conf
目录下就内置了四种Broker部署模式:
- 单机:当前Broker掉线则会导致整个RocketMQ不可用,数据没有任何冗余,无法保证HA
- 2m-noslave:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响
- 2m-2s-async:每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级)
- 2m-2s-sync:每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,但是会导致单条消息的RT会略高
2.3 总结
因此Kafka的副本机制更加灵活,不会受限于Kafka Cluster的部署模式,而是可以根据业务需要细粒度的控制自身Topic-partition的副本,而RocketMQ则是受限于Cluster部署模式,其副本可以看成是Master-Broker的冗余,无法单独制定某个Topic下MessageQueue的副本。
简而言之,副本的维度不同,Kafka是Partition级别,RocketMQ是Broker级别。
三、文件存储结构对比
3.1 Kafka
sh kafka-topics.sh --create --topic multi-test --zookeeper zk:2181 --partitions 2 --replication-factor 1
创建完该Topic之后可以在${KAFKA_HOME}/data
下看到新建的两个目录:multi-test-0
、multi-test-1
分别对应着multi-test
下的两个partition分区,而每个文件夹下都有两个非常重要的文件:
- index
index文件主要用于对log
文件的索引,内部每条记录大小固定,这可以非常快速的根据Offset找到log
文件中对应记录。
然后再根据记录中消息在log
文件中的偏移量找出具体消息 - log
所有的消息都存在此文件中,根据配置定期压缩或删除
3.2 RocketMQ
RocketMQ中的文件较多,下面列举最核心的三类文件:
- commitLog
消息存储的地方,但是并不区分当前broker的不同Topic,也就是说,不论当前Broker承载了多少Topic,所有的数据全部顺序写入该文件夹下的文件中。 - consumeQueue
是commitLog文件的索引文件,每条消息定长,方便根据偏移量快速计算查找。每条记录包含三个字段:
1)消息在commitLog偏移量
2)消息长度
3)消息Tag HashCode:用于消息过滤 - config
记录元信息,主要包含两类:- Topic元数据
- 各个consume消费进度
当然RocketMQ除了上述两类文件,还有abort、index
文件,再次不再赘述。
3.3 总结
Kafka与RocketMQ的文件结构大体相似,但因为RocketMQ所支持的消息类型更加丰富一些,因此文件也会稍稍多一些。
Kafka的数据日志文件是按Topic-Partition为维度内部再按日志大小进行切分,而RocketMQ日志文件是按Broker为维度内部再按日志大小进行切分,这是最大的不同点。
四、发送数据可靠性对比
Kafka与RocketMQ都支持数据发送失败重试机制,其中RocketMQ支持的重试种类更加丰富:
- retryTimesWhenSendFailed
同步发送失败重试次数 - retryTimesWhenSendAsyncFailed
异步发送失败重试次数 - retryAnotherBrokerWhenNotStoreOK
消息刷盘失败或Slave不可用
但需要注意的是:RocketMQ Broadcasting Message不支持重试
五、功能丰富性对比
5.1 对比
RocketMQ较Kafka而言,功能更加丰富,RocketMQ主要的消息功能有:
- 有序消息
- 广播消息
- 延迟消息
- 消息过滤
- 事务消息
- 消费者支持PULL/PUSH模型
而Kafka只支持简单消息的流转,并不支持上述功能。
5.2 关于有序消息
这里需要先明确一个前提,即RocketMQ Consumer使用的是PUSH模式,因为POLL模式下是否存在消费者消费乱序完全取决于自己程序的编写,因此PULL模式不作考虑。
关于有序消息,RocketMQ的官网中单独把它当作一个功能,也是网上其他帖子提到RocketMQ就一定会提到的功能。
有序消息需要保证三个地方的有序:
- Producer数据发送有序
顺序在前的消息一定要先发网Broker,若失败则需要不断进行重试,才能发下一条消息 - Broker中有序
Broker中队列天然符合FIFO
策略,因此这一环可以暂时不用考虑 - Consumer消费有序
Consume Pull下来的消息,处理要保证有序
通过上述三个环节的分析可以得出:
需要保证有序的消息需要被发送到Broker中同一个partition
或者messageQueue
中,然后利用队列本身的FIFO
特性保证前两步消息有序,剩下的只要保证消费者端能够有序的进行消息消费即可。
而在第三步中,两个MQ具有不同的处理方式。Kafka Consumer中单个分区是单线程处理,而Rocket Consumer是线程池方式处理。
因此对于Kafka而言,Producer只要保证相关联消息进入同一个Partition
即可保证消息顺序消费。
对于Rocket而言,由于拉取到的一批数据是并行处理的,加上CPU的调度不确定性,因此这一环节如果不加手段处理,则无法保证有序性。
在RocketMQ是通过对当前处理队列加锁的方式保证了消费有序,即:同一个队列只有一个线程进行处理,具体的代码分析请移步本文8.3顺序消息的保证
小节,某种意义上来说,相当于把消息处理线程池设置为单线程模式。
六、生产者分区分配策略对比
6.1 Kafka
- Round-robin:消息以轮训方式发送到不同
Partition
- 指定Partition:Kafka Producer API中支持消息指定某
Partition
进行发送 - 指定Key:发送消息时指定Key,Kafka Producer内部会将Key进行Hash并对
Partition Num
进行取模操作以选出具体的Partition
6.2 RocketMQ
- Round-robin:消息以轮训方式发送到不同
MessageQueue
- MessageSelector:消息发送时,自定义MessageSelector函数
6.3 总结
Kafka与RocketMQ的多Partition/MessageQueue
机制本身都是为了提升生产效率。
生产者的分区分配是在Producer Client中完成的,而不是消息发送到Broker之后再去做负载,这是为了减少Broker的压力。
七、消费者分区分配策略对比
7.1 Kafka
Round-robin
以轮询方式将Partition
分发给同一个消费者组下的消费者-
Range
将Partition
按照分区号进行排序,再将ConsumerGruop
下的Consume
进行排序,然后以近乎平均分的方式将Partition
分配出去若某个
ConsumerGroup
同时以Range
模式订阅多个Topic
时,可能会造成消费者的浪费,这一点SpringDataKafka
官网有描述,如下图:
7.2 RocketMQ
- 平均分配策略
将MessageQueue
按ID排序,然后按照AVG
算法进行平均分配给Consumer
- 环形平均策略
其效果等同于Kafka
轮询方式分配 - 一致性Hash策略
该算法会将Consumer
的Hash值作为Node
节点存放到Hash环上,然后将MessageQueue
也按照Hash值放入该环中,通过顺时针方向,距离MessageQueue
最近的Consumer
就是该MessageQueue
要分配的Consumer
- 同机房策略
该算法会根据MessageQueue
的部署机房位置与Consumer
的位置,过滤出当前Consumer
相同机房的MessageQueue
,然后再按照平均分配 或 环形策略对同机房的MessageQueue
二次分配
7.3 总结
消费者分区分配同生产者分区分配一致,依然是在Consumer Client
中通过对应API指定,其中RocketMQ的分区策略更加丰富一些。
八、RocketMQ中的部分源码分析
8.1 异步、同步、单向生产消息
8.1.1 同步消息发送流程
源码笔记如下:
由于RocketMQ底层使用Netty
完成网络通讯,而上述方法的入参Channel
是Netty
核心组件之一,首先Producer Client API
会先在自身完成MessageQueue
的负载,在选出MessageQueue
之后,便根据MessageQueue
找出Producer Client
端维护的通向各个MessageQueue
的长链接,并获取对应的Channel
对象。
第412行代码首先会为本次请求构造出一个全局ID,然后开始构造ResponseFuture
对象,该对象内部有一个核心的CountDownLatch
,以完成Future
特性,然后会将opaque
与ResponseFuture
的关联关系保存至一个全局ConcurrentHashMap
中;
将封装好的Request
写入到Channel
中发送出去,在其会调函数中,对发送失败的数据进行去除一些状态,这里不做过多的描述。
在第436行,调用了ResponseFuture
的waitResponse
方法,该方法源码如下:
可以看到就是调用了内部CountDownLatch
让当前线程进行阻塞等待状态,当该线程阻塞完毕之后,接着往下就是对外返回Response
相应了,因此就能达到Sync
发送的效果。
那么接下来只要思考一个问题即可:CountDownLatch
是在何处触发了countDown
?看ResponseFuture
源码中另一个方法:
那么该方法是在何处被调用呢?
其实就在Broker
的响应发送到Producer Client
时调用的,还记得一开始的opaque
变量吗?当Producer Client
发送消息时,也会将该参数一并发送出去,当Broker
端接受并处理完毕后,依然会将opaque
变量放入Response
一起发回到Producer Client
,此时Producer Client
只要在接受到响应的地方解析出opaque
并在opaque
与ResponseFuture
的关联关系ConcurrentHashMap
中找出ResponseFuture
并调用其putResponse
方法即可。
上述过程可能有些复杂,需要童鞋们先对Netty
编程、网络协议
有一个大致了解。毕竟直接基于TCP
协议层开发难度肯定要大于直接使用市面上已有的应用层协议
。
8.1.2 单向消息发送流程
单向消息其实核心处理逻辑与8.1.1同步消息发送流程
一样,唯一不同的是:单向消息不会把ResponseCommand
对象返回给业务方
8.1.3 异步消息发送流程
源码大致与同步发送类似,最大的不同就是方法如参会有一个CallBack
函数,该函数就是业务编写的回调函数,该函数会被纳入到RespinseFuture
对象中,然后数据发送之后也没有调用waitResponse
对线程进行阻塞。
上图为ResponseFuture
对象中执行callback
函数的源码,这样就可以理清楚Async
方式的实现逻辑了。
上述源码为Producer Client
接受到Broker
的响应之后对数据进行处理的方法,同样根据消息中的opaque
变量在opaque
与RespinseFuture
关系表中找到对应的ResponseFuture
对象,然后执行executrInvokeCallBack
方法,该方法源码如下:
这样就完成了Async
逻辑。
8.2 Tag消息过滤
首先要明确一个前提:
在3.2 Rocket文件结构
中描述了consumeQueue
中每条记录都包含Tag
的HashCode
,并且Consumer
在获取消息时,会经过一次Tag HashCode
比较,然后才会从commitLog
中取出具体数据那是不是意味着消息过滤任务已经完成了?其实不是,考虑如下情况:某些情况下不同的Tag
其HashCode
可能相同,至于Broker
端为什么不再consumeQueue
中直接存储Tag
字符串,然后使用equals
方式对比呢?主要是因为HashCode
为整型数据对比效率高。
在有了上述背景之后,也就了解了消息过滤不仅要在Broker
侧过滤也要在Consumer Client
侧对Tag
字符串进行过滤,源码如下:
在Consumer
获取到Message
之后(不论是PULL模型还是PUSH模型),都会遍历消息将Tag
相符的消息拿出。
8.3 顺序消息的保证
对于有序与无序方式消费,这里讨论的前提都是基于PUSH
模型下:
对于有序消息,先会根据MessageQueue
对象获取到其对应的锁,源码如下:
在获取锁完毕后先进行上锁再进行具体的消费业务逻辑处理。