Pulsar 架构的的疑问和改进
本文会讨论 Pulsar 整体架构的一些疑问和可能的改进,一家之言。当前的 Pulsar 版本是2.9.1
,使用的 BookKeeper 和 Zookeeper 版本分别是 4.14.2
和3.6.3
。
实际上,BookKeeper 和 Zookeeper 也是单独的 Apache 项目,Pulsar 项目其实主要就是 Pulsar Broker。为了便于理解,本文用 Broker 代指 Pulsar Broker,用 Pulsar 代指包括 Broker、BookKeeper 和 Zookeeper 三个模块在内的整个系统。
关于无状态(Stateless)的疑问
其实,Pulsar 的每一个模块都不是无状态的。
Zookeeper 和 BookKeeper 是无状态的,很好理解。因为,这两个模块都存有数据,Zookeeper 的各个节点虽然是有最终一致性保证,但是 Leader、Follower 等节点角色的存在,导致了节点处理流程的不同,是有状态的;BookKeeper 就更好解释了,因为 BookKeeper 节点本地存储不同 Ledger 的数据,访问不同 Ledger 要路由到不同的节点,因此也是有状态的。
Broker 本身并不持久化任何数据,其运行所需要的数据都存储在 BookKeeper 和 Zookeeper 上。那么我们可以说 Broker 是无状态的吗?
仍然不可以,无状态的定义最早是来自微服务领域,参考 Microsoft Azure 的定义,
A stateless service is one where there is no state maintained within the service across calls. Any state that is present is entirely disposable and doesn't require synchronization, replication, persistence, or high availability.
指的是,除了服务调用带来的状态,即参数,本身没有任何状态。这个定义之后,Azure 还举例 Calculator 服务是无状态的,还做了一些补充说明,
Not storing any internal state makes this example calculator simple. But most services aren't truly stateless. Instead, they externalize their state to some other store. (For example, any web app that relies on keeping session state in a backing store or cache is not stateless.)
进一步说明,不存储“内部状态”的服务才可以算是无状态服务,对于一些存储了“内部状态”的服务,例如,Session 信息,内存缓存,这些都不能算是无状态服务。除非这些内部状态不存储在服务内,而是由其他的“有状态”服务来完成。
那么很显然,按上面的定义,Broker 肯定不是无状态的。
首先,Broker 是 Topic 的所有者,所有某个特定 Topic 的访问,都需要经过特定的 Broker 来完成。这就是有状态的,内部存储了 Topic Owner 信息。
还有,Broker 是有缓存的,内存缓存了 Entry 信息,按上面的定义,这也是有状态的。
再有,当 Broker 故障的时候,需要有特别的 Fencing 机制来启动新的 Broker 以提供服务。而无状态服务是不需要特别的高可用策略的,只需要提供服务节点即可。
综上,Broker 并不是“无状态”的。
当然,中间件的优秀与否,并不是由是否“无状态”来判定的,厘清这个概念会有助于更好的理解 Pulsar。
关于多 Ledger 存储
Pulsar 中有 Topic、Ledger 的概念,BookKeeper 中有 Ledger、Fragment、Entry 的概念。
对比 Kafka 的设计,Kafka 中只有 Topic、Ledger 的概念,而且 Topic 和 Ledger 是一对一的,而且 Ledger 和物理存储也是一对一的。也就是说,Kafka 中,同一个 Topic 的存储是存储在一起的,放在一个 Ledger 中。这带来的问题就是,随着topic中的数据越来越多,Kafka 的存储会变的很大。如果单个节点发生异常,恢复节点的时候,复制的数据也会非常多,而且必须要等待数据都恢复完了才可以提供服务,影响恢复时间。如果需要增加 Topic 的副本数,也需要复制整个 Topic 的数据,也非常耗时。通常情况下,历史的消息往往是不重要的,我们更关注的是未来的消息,但在 Kafka 的机制下,我们别无选择,只能复制全部数据。
Pulsar 的多 Ledger 存储机制解决了这个问题,是消息中间件的一次成功尝试。
首先,Pulsar 中的 Topic 和 Ledger 是一对多的,一个Topic有多个 Ledger。不同的 Ledger 可以存储在不同的节点上,每个节点上也可以存储多个 Ledger 的数据。不同的 Ledger 也可以有不同的副本数配置。当发生节点故障,需要数据恢复的时候,恢复的流程会相对复杂一点,需要复制多个 Ledger 的数据。虽然需要复制的数据量仍然很大,但是一个一个 Ledger 的复制,先复制完的 Ledger 就可以正常提供服务了。当然,多个 Ledger 的设计也会有副作用,当一个 Bookie 节点故障的时候,可能会影响多个 Topic 提供服务,这需要我们在设计 Ledger 副本数的时候,对每个 Ledger 都有灾备的考虑。
还有,对于增加副本的场景,如果不溯及过往的话,在 Pulsar 中是非常简单的,只要将当前 Topic 新增一个 Ledger,并在新增的 Ledger 中设置期望的副本数即可。这个应该是 Pulsar 的杀手级应用场景。
再有,Bookie 的 Ledger 是混杂存储的,同一个存储内的数据可能来自多个 Ledger,这个存储的设计有点类似于 RocketMQ。Bookie 为了利用磁盘顺序读缓存 Entry,在每次将 Write Cache 写入磁盘的时候,将 Entry 按 Ledger id 和 Entry id 进行排序,结果是同一个 Ledger 的数据基本上得到了聚合。这个机制非常依赖于 Write Cache 的大小和当前 Bookie 打开的 Ledger 数量。如果 Write Cache 太小了,或者打开的 Ledger 数量太多了,起到的聚合作用也很弱。推荐 Write Cache 尽量大一些。可以做一个估算,假设预读取(readahead)1000个 Entry,平均每个 Entry 大小5KB,一次预读取,大约需要读5MB。假设 Bookie 打开了5个 Ledger,同时,消息的大小,产生消息的速度,和预读取的参数也都基本一致,那么,理想情况下,Write Cache 需要至少有5MB*5=25MB,预读取才会比较有效,假设预读取的起始 Cursor 位置是平均分布的,那么每次预读的数量大概在500条。如果增大 Write Cache 的大小到250MB(10倍),同样进行上面的计算,得到每次预录取的消息数量是950条。这就很接近于我们预设的预读取参数了,而且预读取消息数量的方差也比原先要小很多,会降低很多系统性能的抖动。实际上,可以对 Entry 进行重新整理,使 Entry 进一步聚合(但不一定真的需要)。这个可以在 Pulsar 做GC,或者分层存储,把数据刷到长期存储的时候实现。
此外,BookKeeper 的存储把 Journal 和 Entry Log 分开了,可以存储在不同的磁盘上,这个相对于 Kafka 的确是一个进步。因为这两者的存储特性是不一样的。Journal 要求的是事务性,速度快,不要求大容量;Entry Log 要求的是读取高效,写入可以慢,容量要大。分开存储可以利用好不同硬件设备的硬件特性。
关于共识机制和存储的分离
Pulsar 的另一个显著的特性是共识机制(Consensus)和存储的分离。例如,BookKeeper 在整个系统中只是一个单纯的存储功能,每个 Bookie 节点的功能是相同的,不会有角色的差异,虽然存储的数据有不同。Leader 的身份由 Broker 来担当,而 Broker 又不存储数据,于是就达到了共识机制和存储相分离的架构。这种架构有个好处,可以比较容易的变更集群的节点成员,例如增加存储节点,也就是动态成员管理功能。当然,由于分离会带来一些代码实现上的复杂性,这个Pulsar 和 BookKeeper 已经克服。
那么,共识机制和存储的分离,是动态成员管理的先决条件吗?动态成员管理是一个很好的特性,但我们可以不要共识机制和存储分离,只要动态成员管理吗?
类比 Kafka 的设计,Kafka Broker 节点有 Leader 和 Follower 的区别,数据同步的时候,由 Leader 流向 Follower。但 Kafka 是静态成员管理,Kafka Leader 故障的时候,需要选举出新的 Leader;如果有新的节点加入,也需要复制全部的数据。如此这般的特性和 Raft 非常类似,Raft 的成员管理就非常麻烦,设计了一整套机制(joint consensus)来应对成员变更的场景。
诚然,每一个分布式算法中,成员变更的场景都是非常复杂的,甚至有些分布式算法还不能很好支持成员变更的场景,Raft 的 Joint Consensus 方法来处理成员变更,已经是相对简单,但其复杂程度也是无法一眼就看明白的。Pulsar 的动态成员管理非常的简易,是如何跳出这么复杂的算法陷阱的?
实际上,Pulsar 并没有什么独到的办法,只是将增加数据节点(Bookie),解释成了成员管理而已。Pulsar 的动态成员管理,并不是分布式算法中的分布式集群成员管理。Pulsar 的共识机制的来源是 Zookeeper(使用 ZAB 分布式协议,与 Raft 相似),其增加的 Bookie 节点,并不是 Zookeeper 的成员,而只是数据存储的一个地址而已。如果增加了一个 Bookie 节点,只需要在 Zookeeper 中增加该节点的地址,于是,复杂的分布式集群成员管理问题,被取代为分布式集群中增加一条数据的问题,问题的复杂度被大大降低了。
同样的道理,Pulsar 的 Broker 节点也可以增加,利用 Zookeeper 增加多个备选的 Broker。当 Leader Broker 故障的时候,选取新的 Leader 也不是分布式算法中选举 Leader 的过程,而是多个 Broker 去“抢占”某个 Topic 的所有权。整个过程类似于竞争分布式锁,哪个 Broker 获得了这个锁,就相当于持有了 Topic 的所有权,成为了 Leader。选举问题也被大大简化了。
想明白了 Pulsar 动态成员管理功能的实现,就可以类比 Raft 和 Kafka 了。Raft 是一个分布式算法,Raft 集群如果需要成员变更就需要按算法的机制进行,即使复杂也没办法。Kafka 不是一个分布式算法,其共识机制的来源是 Zookeeper,Kafka Broker 集群如果要变更 Broker 节点,也是向 Zookeeper 写入数据;Kafka Broker 如果要重新选取 Leader,也是要去 Zookeeper 中抢占一个锁。所以 Kafka 共识机制的实现原理和 Pulsar 是一致的,那么,为什么 Kafka 没有动态成员管理呢?
如果按照 Pulsar 对成员管理的解释,Kafka 也是有动态成员管理功能的,但是这个功能实在有些糟糕。比如,加入一个节点,需要复制 Topic 所有已知历史的数据,性能上并不可观,这样的功能无法作为一个功能点来宣传,
反观 Pulsar,加入节点不需要溯及历史,也没有数据复制,加入之后只需要等待 Topic 下一次创建 Ledger 的时候,选到新的 Bookie 即可;即使成员的数量有变化,例如从原先 3 个 Bookie 节点,变为 5 个 Bookie 节点,也就是在创建 Ledger 的时候多选几个节点,代码的参数发生了变化而已。
因此,Pulsar 动态成员管理的功能,并不是来自于共识机制与存储的分离,在 Pulsar 的不溯及历史、单 Topic 多 Ledger 等特性的前提下,动态成员管理是一个非常“自然”能想到的功能。Kafka 是吃了需要复制历史数据的亏。
Pulsar 的这种分离架构也不是尽善尽美,这个设计导致了 Pulsar 过度依赖 Zookeeper,在 Zookeeper 中存储了太多元数据信息,如果 Zookeeper 故障不可用,Pulsar 集群将几乎完全宕机。而 Kafka 在 Zookeeper 中的元数据较少,在 Zookeeper 宕机的情况下,依然可以保持基本的服务。当然,综合来看,Pulsar 的这个设计还是利大于弊的。
关于脑裂问题
Pulsar 为了解决 Leader Broker 节点故障切换的问题,使用了一个叫 Fencing 的机制,解决了 Leader 节点故障时候的脑裂问题。
方案本身不详细描述了,Fencing 方案和 Raft Leader 的故障恢复机制实际上是没有什么差别的,应该是有所借鉴。至于解决了脑裂,这个也不是真正解决,也是由于消息系统的特性导致的直接结果。为什么这么说呢?
Raft 中也有类似的概念,叫 committed index(Pulsar与之对应的是LAC),只有在收到多数节点写入 Entry 返回成功之后,才可以更新 committed index,再更新 Entry 到状态机中,并返回给客户端。对尚未更新 committed index 的 Entry,Raft 也是不可读的。可以发现,Pulsar 的 Fencing 和 Raft 的机制几乎一致,但是 Raft 有脑裂问题。
先回顾一下 Raft 的脑裂问题。当 Raft Leader 节点故障发生时,例如 Raft Leader 网络断开,其他节点已经发现当前 Leader 超时,并发起下一轮选举投票,快速选举出新的 Leader,但是老 Raft Leader 的 Follower 无响应超时时间尚未到达,导致老 Leader 仍然认为自己是真正的 Leader,并响应客户端的请求,因此导致客户端读取到了旧的数据。而与此同时,部分客户端连接到了新 Raft Leader,写入并读取到新的数据,造成不一致,这是 Raft 发生脑裂的原因。Raft 发生脑裂不会持续很长时间,当老 Leader 发现长时间没有收到 Follower 响应而超时(主要取决于超时参数的配置),或者发现有新 Leader 产生时,老 Leader 就会将自己重置为 Follower。
那使用了同样机制的 Pulsar 为什么就没有脑裂问题呢?那是因为,Pulsar 是个消息系统,写入的消息类似 WAL,是不可变的(immutable),追加的。当发生和 Raft 一样的故障的时候,老的 Pulsar Broker 也会读到老的数据,但老的数据仍然合法,因为对同样的 Cursor,在新的 Broker 上也是读到相同的数据,只要读取 Entry 不超过 LAC 就没问题,最多只是无法获取到最新的消息而已,获取的消息并不会错。而 Raft 的存储是偏向于通用存储场景,因此就会有新旧数据版本不一致的问题。
脑裂一般都是指读取数据发生的不一致,如果是写入数据的脑裂,那可能是分布式算法有问题,成熟的算法一般不会有这个问题。
关于和Kafka的性能对比
老实说,目前的性能测试报告都显示,Pulsar 的性能高于 Kafka 很多,其实有些费解。尤其是简化的测试场景,比如单个 Topic,副本3个,写入集2个,这样的场景说实话,Kafka 的实现和 Pulsar 几乎就是一样的,为什么会有如此大的性能差异呢?
我们假定性能差异确实是存在的(很遗憾我手边没有环境做这样的性能测试),Pulsar 的性能高出 Kafka 一截,这样的性能差异是如何造成的呢?
我们先排除代码实现层面的差异,两个中间件的开发者应该都是相当资深的工程师,本身代码写法引起的性能差异应该并不显著,也太过于细节。
首先,比较一下简单场景的测试场景是否公平。单个 Topic,副本3个,写入集2个,一边写入,另一边消费,这个场景应该没有任何问题,也是我们绝大多数情况下遇到的场景。由于几乎在写入完成后的同时,就会有消息消费,因此,这些消息都会有内存缓存。简单来说,这个场景就是一边磁盘写,一边内存读的场景。
测试场景没有问题,就需要细节分析两者处理流程的差异了。先分析一下写入的性能,Pulsar 的写入是先到 Broker,再到 Bookie,然后返回;Kafka 的写入是先到 Leader Broker,再同步到 insync 的 Broker,然后返回。从网络调用的路径上来说,两者都有一层网络调用。在写入磁盘的时候,两者都是以追加写的方式把消息写入文件,如果写入的存储没有硬件上的差异,那么这一块应该也没有什么差异。总之,并不觉得写入的部分造成了性能的差异。
再看一下写入的性能差异。其实,两者都是内存读,性能差异应该很小,实现层面上来看,Pulsar 的内存缓存是自己实现的,Broker 和 BookKeeper 有2层缓存,这个场景下,基本都会命中 Broker 缓存,也就是第一层缓存;Kafka 的内存缓存是借用的 Linux Page Cache 机制,相当于是操作系统提供的,只有1层缓存,在这个场景下也基本都能命中。如果说,读取的性能有差异,那么就是内存缓存和 Page Cache 的差异,毕竟 Page Cache 是内核态的东西,系统调用的确是有损耗的。但 Kafka 用了零拷贝的 sendfile()
系统调用,应该也赚回来了一次系统调用的消耗,Pulsar 在发送的时候,需要调用 Socket 的 send()
将缓存的数据发送除去,也有一次内存从用户态到内核态的消耗。如果说这里真的有差异的话,也就是说,“内存缓存+send()
”胜过了“Page Cache+sendfile()
”,硬要找个理由的话,Pulsar 的实现只需要1次系统调用,而 Kafka 需要2次(虽然有一次是零拷贝)。
以上差异,不足以构成显著的性能差异,最多5%以内的性能差别。
但有一个场景,也是比较常见的场景,Pulsar 是远胜于 Kafka 的,就是追赶读(catch-up read)场景。Pulsar 的 BookKeeper 缓存管理是分为 Write Cache 和 Read Cache 的,写入数据与读取数据的缓存分开管理,Write Cache 用于追尾读(tail read)场景,Read Cache 用于追赶读场景,互不干扰。而 Kafka 的缓存是操作系统 Page Cache 提供的,在追赶读场景,会擦除掉写入数据的缓存,两者在竞争 Page Cache 的使用,导致了很多加载到 Cache 的数据被擦除后,反复加载,增大 Cache Mis。尤其是追尾读和追赶读同时存在的时候,例如比例是一半一半的时候,Kafka 的缓存竞争会更加激烈,性能会下降比较厉害。
关于缓存的机制
BookKeeper 的缓存分为 Write Cache 和 Read Cache,但 Broker 的缓存设计却不一样,没有做这样的区分。这样的设计是非常难以理解的,同样的技术场景为什么要采用两种不同的缓存设计策略?
BookKeeper 的读写缓存分离,是为了优化追赶读和追尾读共存的场景,减少对缓存的竞争,以达到更高的 Cache Hit。对 Broker 来说,这个场景依然存在,是完全一样的,如果 Broker 没有采用读写缓存分离,也会有缓存竞争,会有很多的请求落到 BookKeeper 上,白白多了一次网络IO的消耗。
另外,Bookie 上有多个 Ledger 的数据,缓存要按 Ledger 分多份,Broker 是单个 Topic 的 Owner,如果按 Topic 维度来进行缓存,应该也不会涉及到内存的过多消耗。
因此,我觉得 Pulsar Broker 也应该采用 Write Cache 和 Read Cache 分离的机制,以进一步提升性能。
改进点1:集成 Broker 和 BookKeeper
因为共识机制和存储的分离并不会带来明显的收益,Pulsar 的动态成员管理是来自于单 Topic 多 Ledger 特性的自然推论,所以,把 Broker 和 BookKeeper 分离成两个服务,恐怕不是一个好的选择。如果将共识机制和存储重新合并在同一个节点,称为新 Bookie 节点,性能将势必提升。
首先,之前 Pulsar 中需要直接访问 BookKeeper 的场景将少去一次网络调用,也就是 Broker 在 Cache Mis 的时候,也不会有网络调用的消耗,这样明显会提升性能。这样的设计带来了另一个直觉的结果,Leader 节点上有当前 Topic 的最新 Entry,因此,Leader 的 Write Cache 数据就是最新的,可以很快响应追尾读。
其次,功能方面,并没有减少原先的功能,尤其在动态成员管理这里。如果是非 Leader 节点故障,那么恢复方案和原先一样,新增 Bookie 加入到成员中,进行成员更新,并数据复制;如果是 Leader 节点发生故障,那么恢复方案会相对复杂一点,可以分几步来描述。如下,
- 在剩下的
E-1
(E 表示集群数量)个 Bookie 节点发起选举,产生新的 Leader; - 新的 Leader 先同步存活的各个节点的数据到 Commit Index,此时的场景和非 Leader 节点故障的情况就是一样的了;
- 新增 Bookie 加入到成员中,进行成员更新,并数据复制
在选举 Leader 和同步数据到 Commit Index 的时候,细节上仍然可以采用类似 Fencing 的机制。这样也不会减少原有的 Pulsar 灵活的特性。
还有,Pulsar 原先的设计造成了,对于单个 Topic 来说 Broker 就是实际上的单点,扩展性受限。所有的写入读取,都需要通过单点的 Broker 来完成,造成系统整体压力的不平均。当新 Bookie 同时有 Broker 的功能的时候,还可以分散单点 Leader 的压力,进行另一项优化,“追赶读优化”。
改进点2:追赶读优化
当共识机制和存储重新结合,成为新 Bookie 节点的时候,可以引入新的追赶读优化,设计原则如下,
- 所有的追赶读由非 Leader 节点完成,Leader 节点要提示客户端可供访问的非 Leader 节点列表;
- 所有节点增加限流机制,Leader 节点达到限流的时候(指追尾读达到上限),提示客户端访问非 Leader 节点;
- 非 Leader 节点也达到限流的时候,拒绝访问;
在这样的设计下,再加上 Write Cache 和 Read Cache 分离的机制,Leader 节点的压力被平分到了非 Leader 节点上,从根本上解决原有的 Broker 单点的问题,充分发挥了集群中每个节点的处理能力。