1 简介
RocketMQ是一款分布式、队列模型的消息中间件,由阿里巴巴自主研发的一款适用于高并发、高可靠性、海量数据场景的消息中间件。早期开源2.x版本名为Metaq,迭代3.x版本,更名为RocketMQ,16年开始贡献到Apache。
经过1年多的孵化,最终成为Apache顶级的开源项目,更新非常频繁,社区活跃度也非常高;目前4.8.0-release版本。RocketMQ参考借鉴了优秀的开源消息中间件Apache - Kafka,其消息的路由集群划分都借鉴了Kafka优秀的设计思路,并结合自身的 “双十一” 场景进行了合理的扩展和API丰富。
附上官网地址: http://rocketmq.apache.org/
2 特点
2.1 核心特点
- 支持集群模型、负载均衡、水平扩展能力
- 亿级别的消息堆积能力
- 采用零拷贝的原理、顺序写盘、随机读(索引文件)
- 丰富的API使用
- 代码优秀,底层通信框架采用Netty NIO框架
- NameServer 代替 Zookeeper
- 强调集群无单点,可扩展,任意一点高可用,水平可扩展
- 消息失败重试机制、消息可查询
- 开源社区活跃度、是否足够成熟(经过双十一考验)
2.2 专业术语
- Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息。
- Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。
- Push Consumer:Consumer的一种,需要向Consumer对象注册监听。
- Pull Consumer:Consumer的一种,需要主动请求Broker拉取消息。
- Producer Group:生产者集合,一般用于发送一类消息。
- Consumer Group:消费者集合,一般用于接受一类消息进行消费。
- Broker : MQ消息服务(中转角色,用于消息存储与生产消费转发)。
3 核心原理
3.1 三种模型
3.1.1 队列模型
就像我们理解队列一样,消息中间件的队列模型就真的只是一个队列。
如果使用 广播模式
的话,我们需要将一个消息发送给多个消费者(比如此时我需要将信息发送给短信系统和邮件系统),这个时候单个队列即不能满足需求了。
当然你可以让 Producer 生产消息放入多个队列中,然后每个队列去对应每一个消费者。问题是可以解决,创建多个队列并且复制多份消息是会很影响资源和性能的。而且,这样子就会导致生产者需要知道具体消费者个数然后去复制对应数量的消息队列,这就违背我们消息中间件的解耦这一原则。
3.1.2 主题模型
也可以称为发布订阅模型。
在主题模型中,消息的生产者称为发布者(Publisher),消息的消费者称为订阅者(Subscriber),存放消息的容器称为主题(Topic)。
其中,发布者将消息发送到指定主题中,订阅者需要提前订阅主题才能接受特定主题的消息。
3.1.3 消息模型
RockerMQ 中的消息模型就是按照主题模型所实现的。
我们可以看到在整个图中有 Producer Group、Topic、Consumer Group 三个角色,我来分别介绍一下他们。
- Producer Group 生产者组:代表某一类的生产者,比如我们有多个秒杀系统作为生产者,这多个合在一起就是一个 Producer Group 生产者组,它们一般生产相同的消息。
- Consumer Group 消费者组:代表某一类的消费者,比如我们有多个短信系统作为消费者,这多个合在一起就是一个 Consumer Group 消费者组,它们一般消费相同的消息。
- Topic 主题:代表一类消息,比如订单消息,物流消息等等。
你可以看到图中生产者组中的生产者会向主题发送消息,而主题中存在多个队列,生产者每次生产消息之后是指定主题中的某个队列发送消息的。
每个主题中都有多个队列(这里还不涉及到 Broker),集群消费模式下,一个消费者集群多台机器共同消费一个 topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。就像上图中 Consumer1 和 Consumer2 分别对应着两个队列,而 Consuer3 是没有队列对应的,所以一般来讲要控制消费者组中的消费者个数和主题中队列个数相同。
所以总结来说,RocketMQ 通过使用在一个 Topic 中配置多个队列并且每个队列维护每个消费者组的消费位置实现了主题模式/发布订阅模式。
3.2 架构
RocketMQ 技术架构中有四大角色 NameServer、Broker、Producer、Consumer。我来向大家分别解释一下这四个角色是干啥的。
-
Broker:主要负责消息的存储、投递和查询以及服务高可用保证。说白了就是消息队列服务器嘛,生产者生产消息到 Broker,消费者从 Broker 拉取消息并消费。
这里,我还得普及一下关于 Broker、Topic 和队列的关系。上面我讲解了 Topic 和队列的关系——一个 Topic 中存在多个队列,那么这个 Topic 和队列存放在哪呢?
一个 Topic 分布在多个 Broker 上,一个 Broker 可以配置多个 Topic,它们是多对多的关系。
如果某个 Topic 消息量很大,应该给它多配置几个队列(上文中提到了提高并发能力),并且尽量多分布在不同 Broker 上,以减轻某个 Broker 的压力。
Topic 消息量都比较均匀的情况下,如果某个 broker 上的队列越多,则该 broker 压力越大。
所以说我们需要配置多个 Broker。
- NameServer:不知道你们有没有接触过 ZooKeeper 和 Spring Cloud 中的 Eureka,它其实也是一个注册中心,主要提供两个功能:Broker 管理和路由信息管理。说白了就是 Broker 会将自己的信息注册到 NameServer 中,此时 NameServer 就存放了很多 Broker 的信息(Broker的路由表),消费者和生产者就从 NameServer 中获取路由表然后照着路由表的信息和对应的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息)。
- Producer:消息发布的角色,支持分布式集群方式部署。说白了就是生产者。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。说白了就是消费者。
简单化的架构图:
官网的架构图:
第一、我们的 Broker 做了集群并且还进行了主从部署,由于消息分布在各个 Broker 上,一旦某个 Broker 宕机,则该 Broker 上的消息读写都会受到影响。所以 RocketMQ 提供了 master/slave 的结构,salve 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,则 slave 提供消费服务,但是不能写入消息(后面我还会提到)。
第二、为了保证 HA,我们的 NameServer 也做了集群部署,但是请注意它是去中心化的。也就意味着它没有主节点,你可以很明显地看出 NameServer 的所有节点是没有进行 Info Replicate 的,在 RocketMQ 中是通过单个 Broker 和所有 NameServer 保持长连接,并且在每隔 30 秒 Broker 会向所有 Nameserver 发送心跳,心跳包含了自身的 Topic 配置信息,这个步骤就对应这上面的 Routing Info。
第三、在生产者需要向 Broker 发送消息的时候,需要先从 NameServer 获取关于 Broker 的路由信息,然后通过轮询的方法去向每个队列中生产数据以达到负载均衡的效果。
第四、消费者通过 NameServer 获取所有 Broker 的路由信息后,向 Broker 发送 Pull 请求来获取消息数据。Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下,一条消息会发送给同一个消费组中的所有消费者,集群模式下消息只会发送给一个消费者。
3.3 存储机制
- CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G = 1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
- ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能(我们再前面也讲了),由于 RocketMQ 是基于主题 Topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 Topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;
- IndexFile:IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。这里只做科普不做详细介绍。
总结来说,整个消息存储的结构,最主要的就是 CommitLoq 和 ConsumeQueue。而 ConsumeQueue 你可以大概理解为 Topic 中的队列。
RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件来存储消息。有意思的是在同样高并发的 Kafka 中会为每个 Topic 分配一个存储文件。这就有点类似于我们有一大堆书需要装上书架,RockeMQ 是不分书的种类直接成批的塞上去的,而 Kafka 是将书本放入指定的分类区域的。
而 RocketMQ 为什么要这么做呢?原因是提高数据的写入效率,不分 Topic 意味着我们有更大的几率获取成批的消息进行数据写入,但也会带来一个麻烦就是读取消息的时候需要遍历整个大文件,这是非常耗时的。
所以,在 RocketMQ 中又使用了 ConsumeQueue 作为每个队列的索引文件来提升读取消息的效率。我们可以直接根据队列的消息序号,计算出索引的全局位置(索引序号*索引固定⻓度20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。
4 相关信息
- 博文不易,辛苦各位猿友点个关注和赞,感谢