RocketMQ与Kafka的一些对比与源码分析

  • 一、架构图对比
    • 1.1 Kafka Architecture
    • 1.2 RocketMQ Architecture
    • 1.3 总结
  • 二、高可用对比
    • 2.1 Kafka
    • 2.2 RocketMQ
    • 2.3 总结
  • 三、文件存储结构对比
    • 3.1 Kafka
    • 3.2 RocketMQ
    • 3.3 总结
  • 四、发送数据可靠性对比
  • 五、功能丰富性对比
    • 5.1 对比
    • 5.2 关于有序消息
  • 六、生产者分区分配策略对比
    • 6.1 Kafka
    • 6.2 RocketMQ
    • 6.3 总结
  • 七、消费者分区分配策略对比
    • 7.1 Kafka
    • 7.2 RocketMQ
    • 7.3 总结
  • 八、RocketMQ中的部分源码分析
    • 8.1 异步、同步、单向生产消息
      • 8.1.1 同步消息发送流程
      • 8.1.2 单向消息发送流程
      • 8.1.3 异步消息发送流程
    • 8.2 Tag消息过滤
    • 8.3 顺序消息的保证

Kafka:Linkedin研发并捐赠给Apache软件基金会的开源分布式消息系统
RocketMQ:阿里研发并捐赠给Apache软件基金会的开源分布式消息系统

一、架构图对比

1.1 Kafka Architecture
0.Kafka架构图.png
  • Zookeeper集群:存储Kafka集群的元数据,Kafka早期版本Consumer Offset也被存储在此
  • Broker集群:对外提供消息服务、消息存储的地方
  • Producer Client:数据生产者
  • Consumer Client:数据消费者
1.2 RocketMQ Architecture
1.RocketMQ架构图.png
  • NameServer集群:存储RocketMQ集群的路由信息,譬如BrokerData、TopicData,角色可类比到Kafka中的Zookeeper集群,但NameServer集群是无中心化集群,所有机器地位相等,且不同之间NameServer无任何数据交互,相比于ZK集群更加轻量。
  • Broker集群:对外提供服务、消息存储
  • Producer Client:数据生产者
  • Consume Client:数据消费者
1.3 总结

大体看来,两款MQ的组成部分相似,有负责消息存储与检索的服务、有集群注册中心、还有业务侧必须用的两个Client。
这里大致说一下NameServer与Zookeeper的对比:
在分布式环境下多台机器肯定需要互相协调以应对各类复杂场景包括:对外提供服务、内部容错、HA。Kafka与RocketMQ也是一样,他们分别引入Zookeeper与NameServer作为自身集群的协调工作者,其实更像是一个元数据存储地与注册中心,它们都存储着集群下机器信息与Topic信息等,为多机之间的协调工作提供数据支持。
Zookeeper是一个比较重量级的中间件,在集群模式下,Zookeeper通过ZAB协议来保证Leaderfollower之间的数据一致,这里说一个疑惑:网上很多资料会说ZK是强一致的,其实不是,ZAB协议中在数据同步时,采用的也是过半机制,也就是说只要半数followerLeader进行ACK后,Leader会被认为数据已经同步成功,因此严格来说,ZK集群并不能保证数据强一致。
NameServer相比于ZK是一个非常轻量级的组件,是无中心化的集群模式,集群下各节点是无差异的,节点与节点之间不会相互进行信息通讯,因此NameServer可以看成是无状态,那么自然就不需要引入各类复杂的分布式协议以保证集群的数据一致性与容错性。

二、高可用对比

这里说的高可用主要针对Broker HA进行说明,注意:对于Rocket与Kafka来说,Broker的高可用有着较大的差异,个人觉得是两者对比的重点。

对于MQ产品而言保证消息的安全一定是重中之重的任务,RocketMQ与Kafka都引入了数据冗余策略来保证HA

2.1 Kafka
2.Kafka副本.png

上图为执行以下命令得到的一段帮助文档:

cd ${KAFKA_HOME}/bin
sh kafka-topics.sh -help

Kafka在创建Topic时,可指定--replication-factor参数,该参数表示为每个partition创建多少个副本,而这些副本不能在同台机器上,以保证HA,如下图:

3.Kafka分区副本.png

分区follower会主动向leader进行数据同步以维护自身消息,当leader所在机器发生不可用时,只要还有follower可用则依然可以对外提供服务(这里针对主、副本切换可能导致部署数据丢失情况不作讨论)。

总体来看,Topic下的partition有几个replication具体是在创建该Topic时指定的,是以Topic为维度设置的。

2.2 RocketMQ

RocketMQ中每个Topic MessageQueue(可以类比到Kafka中的Partition角色)可以有几个副本并不是在创建Topic时指定,而是由Broker集群创建之初就决定了,下图列举了1m-1s的集群结构。

4.RocketMQ副本.png

${ROCKET_HOME}/conf目录下就内置了四种Broker部署模式:

  • 单机:当前Broker掉线则会导致整个RocketMQ不可用,数据没有任何冗余,无法保证HA
  • 2m-noslave:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响
  • 2m-2s-async:每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级)
  • 2m-2s-sync:每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,但是会导致单条消息的RT会略高
2.3 总结

因此Kafka的副本机制更加灵活,不会受限于Kafka Cluster的部署模式,而是可以根据业务需要细粒度的控制自身Topic-partition的副本,而RocketMQ则是受限于Cluster部署模式,其副本可以看成是Master-Broker的冗余,无法单独制定某个Topic下MessageQueue的副本。
简而言之,副本的维度不同,Kafka是Partition级别,RocketMQ是Broker级别。

三、文件存储结构对比

3.1 Kafka
sh kafka-topics.sh --create --topic multi-test --zookeeper zk:2181 --partitions 2 --replication-factor 1

创建完该Topic之后可以在${KAFKA_HOME}/data下看到新建的两个目录:multi-test-0multi-test-1分别对应着multi-test下的两个partition分区,而每个文件夹下都有两个非常重要的文件:

  • index
    index文件主要用于对log文件的索引,内部每条记录大小固定,这可以非常快速的根据Offset找到log文件中对应记录。
    然后再根据记录中消息在log文件中的偏移量找出具体消息
  • log
    所有的消息都存在此文件中,根据配置定期压缩或删除
3.2 RocketMQ

RocketMQ中的文件较多,下面列举最核心的三类文件:

  • commitLog
    消息存储的地方,但是并不区分当前broker的不同Topic,也就是说,不论当前Broker承载了多少Topic,所有的数据全部顺序写入该文件夹下的文件中。
  • consumeQueue
    是commitLog文件的索引文件,每条消息定长,方便根据偏移量快速计算查找。每条记录包含三个字段:
    1)消息在commitLog偏移量
    2)消息长度
    3)消息Tag HashCode:用于消息过滤
  • config
    记录元信息,主要包含两类:
    • Topic元数据
    • 各个consume消费进度
      当然RocketMQ除了上述两类文件,还有abort、index文件,再次不再赘述。
3.3 总结

Kafka与RocketMQ的文件结构大体相似,但因为RocketMQ所支持的消息类型更加丰富一些,因此文件也会稍稍多一些。
Kafka的数据日志文件是按Topic-Partition为维度内部再按日志大小进行切分,而RocketMQ日志文件是按Broker为维度内部再按日志大小进行切分,这是最大的不同点。

四、发送数据可靠性对比

Kafka与RocketMQ都支持数据发送失败重试机制,其中RocketMQ支持的重试种类更加丰富:

  • retryTimesWhenSendFailed
    同步发送失败重试次数
  • retryTimesWhenSendAsyncFailed
    异步发送失败重试次数
  • retryAnotherBrokerWhenNotStoreOK
    消息刷盘失败或Slave不可用

但需要注意的是:RocketMQ Broadcasting Message不支持重试

五、功能丰富性对比

5.1 对比

RocketMQ较Kafka而言,功能更加丰富,RocketMQ主要的消息功能有:

  • 有序消息
  • 广播消息
  • 延迟消息
  • 消息过滤
  • 事务消息
  • 消费者支持PULL/PUSH模型

而Kafka只支持简单消息的流转,并不支持上述功能。

5.2 关于有序消息

这里需要先明确一个前提,即RocketMQ Consumer使用的是PUSH模式,因为POLL模式下是否存在消费者消费乱序完全取决于自己程序的编写,因此PULL模式不作考虑。

关于有序消息,RocketMQ的官网中单独把它当作一个功能,也是网上其他帖子提到RocketMQ就一定会提到的功能。

有序消息需要保证三个地方的有序:

  • Producer数据发送有序
    顺序在前的消息一定要先发网Broker,若失败则需要不断进行重试,才能发下一条消息
  • Broker中有序
    Broker中队列天然符合FIFO策略,因此这一环可以暂时不用考虑
  • Consumer消费有序
    Consume Pull下来的消息,处理要保证有序

通过上述三个环节的分析可以得出:

需要保证有序的消息需要被发送到Broker中同一个partition或者messageQueue中,然后利用队列本身的FIFO特性保证前两步消息有序,剩下的只要保证消费者端能够有序的进行消息消费即可。
而在第三步中,两个MQ具有不同的处理方式。Kafka Consumer中单个分区是单线程处理,而Rocket Consumer是线程池方式处理。
因此对于Kafka而言,Producer只要保证相关联消息进入同一个Partition即可保证消息顺序消费。
对于Rocket而言,由于拉取到的一批数据是并行处理的,加上CPU的调度不确定性,因此这一环节如果不加手段处理,则无法保证有序性。
在RocketMQ是通过对当前处理队列加锁的方式保证了消费有序,即:同一个队列只有一个线程进行处理,具体的代码分析请移步本文8.3顺序消息的保证小节,某种意义上来说,相当于把消息处理线程池设置为单线程模式。

六、生产者分区分配策略对比

6.1 Kafka
  • Round-robin:消息以轮训方式发送到不同Partition
  • 指定Partition:Kafka Producer API中支持消息指定某Partition进行发送
  • 指定Key:发送消息时指定Key,Kafka Producer内部会将Key进行Hash并对Partition Num进行取模操作以选出具体的Partition
6.2 RocketMQ
  • Round-robin:消息以轮训方式发送到不同MessageQueue
  • MessageSelector:消息发送时,自定义MessageSelector函数
6.3 总结

Kafka与RocketMQ的多Partition/MessageQueue机制本身都是为了提升生产效率。

生产者的分区分配是在Producer Client中完成的,而不是消息发送到Broker之后再去做负载,这是为了减少Broker的压力。

七、消费者分区分配策略对比

7.1 Kafka
  • Round-robin
    以轮询方式将Partition分发给同一个消费者组下的消费者

  • Range
    Partition按照分区号进行排序,再将ConsumerGruop下的Consume进行排序,然后以近乎平均分的方式将Partition分配出去

    若某个ConsumerGroup同时以Range模式订阅多个Topic时,可能会造成消费者的浪费,这一点SpringDataKafka官网有描述,如下图:

5.Range分配.png
7.2 RocketMQ
  • 平均分配策略
    MessageQueue按ID排序,然后按照AVG算法进行平均分配给Consumer
  • 环形平均策略
    其效果等同于Kafka轮询方式分配
  • 一致性Hash策略
    该算法会将Consumer的Hash值作为Node节点存放到Hash环上,然后将MessageQueue也按照Hash值放入该环中,通过顺时针方向,距离MessageQueue最近的Consumer就是该MessageQueue要分配的Consumer
  • 同机房策略
    该算法会根据MessageQueue的部署机房位置与Consumer的位置,过滤出当前Consumer相同机房的MessageQueue,然后再按照平均分配 或 环形策略对同机房的MessageQueue二次分配
7.3 总结

消费者分区分配同生产者分区分配一致,依然是在Consumer Client中通过对应API指定,其中RocketMQ的分区策略更加丰富一些。

八、RocketMQ中的部分源码分析

8.1 异步、同步、单向生产消息
8.1.1 同步消息发送流程
6.消息同步发送.png

源码笔记如下:

由于RocketMQ底层使用Netty完成网络通讯,而上述方法的入参ChannelNetty核心组件之一,首先Producer Client API会先在自身完成MessageQueue的负载,在选出MessageQueue之后,便根据MessageQueue找出Producer Client端维护的通向各个MessageQueue的长链接,并获取对应的Channel对象。

第412行代码首先会为本次请求构造出一个全局ID,然后开始构造ResponseFuture对象,该对象内部有一个核心的CountDownLatch,以完成Future特性,然后会将opaqueResponseFuture的关联关系保存至一个全局ConcurrentHashMap中;

将封装好的Request写入到Channel中发送出去,在其会调函数中,对发送失败的数据进行去除一些状态,这里不做过多的描述。

在第436行,调用了ResponseFuturewaitResponse方法,该方法源码如下:

7.wait.png

可以看到就是调用了内部CountDownLatch让当前线程进行阻塞等待状态,当该线程阻塞完毕之后,接着往下就是对外返回Response相应了,因此就能达到Sync发送的效果。

那么接下来只要思考一个问题即可:CountDownLatch是在何处触发了countDown?看ResponseFuture源码中另一个方法:

8.put.png

那么该方法是在何处被调用呢?

其实就在Broker的响应发送到Producer Client时调用的,还记得一开始的opaque变量吗?当Producer Client发送消息时,也会将该参数一并发送出去,当Broker端接受并处理完毕后,依然会将opaque变量放入Response一起发回到Producer Client,此时Producer Client只要在接受到响应的地方解析出opaque并在opaqueResponseFuture的关联关系ConcurrentHashMap中找出ResponseFuture并调用其putResponse方法即可。

上述过程可能有些复杂,需要童鞋们先对Netty编程、网络协议有一个大致了解。毕竟直接基于TCP协议层开发难度肯定要大于直接使用市面上已有的应用层协议

8.1.2 单向消息发送流程

单向消息其实核心处理逻辑与8.1.1同步消息发送流程一样,唯一不同的是:单向消息不会把ResponseCommand对象返回给业务方

8.1.3 异步消息发送流程
9.Async.png

源码大致与同步发送类似,最大的不同就是方法如参会有一个CallBack函数,该函数就是业务编写的回调函数,该函数会被纳入到RespinseFuture对象中,然后数据发送之后也没有调用waitResponse对线程进行阻塞。

10.callback.png

上图为ResponseFuture对象中执行callback函数的源码,这样就可以理清楚Async方式的实现逻辑了。

11.handle-response.png

上述源码为Producer Client接受到Broker的响应之后对数据进行处理的方法,同样根据消息中的opaque变量在opaqueRespinseFuture关系表中找到对应的ResponseFuture对象,然后执行executrInvokeCallBack方法,该方法源码如下:

12.execute-callback.png

这样就完成了Async逻辑。

8.2 Tag消息过滤

首先要明确一个前提:

3.2 Rocket文件结构中描述了consumeQueue中每条记录都包含TagHashCode,并且Consumer在获取消息时,会经过一次Tag HashCode比较,然后才会从commitLog中取出具体数据那是不是意味着消息过滤任务已经完成了?其实不是,考虑如下情况:某些情况下不同的TagHashCode可能相同,至于Broker端为什么不再consumeQueue中直接存储Tag字符串,然后使用equals方式对比呢?主要是因为HashCode为整型数据对比效率高。

在有了上述背景之后,也就了解了消息过滤不仅要在Broker侧过滤也要在Consumer Client侧对Tag字符串进行过滤,源码如下:

13.Tag-Filter.png

Consumer获取到Message之后(不论是PULL模型还是PUSH模型),都会遍历消息将Tag相符的消息拿出。

8.3 顺序消息的保证
14.有序保证.png

对于有序与无序方式消费,这里讨论的前提都是基于PUSH模型下:

对于有序消息,先会根据MessageQueue对象获取到其对应的锁,源码如下:

15.细粒度锁.png

在获取锁完毕后先进行上锁再进行具体的消费业务逻辑处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容