一个完整的 Nebula 部署集群包含三个服务,即 Query Service,Storage Service 和 Meta Service。每个服务都有其各自的可执行二进制文件,这些二进制文件既可以部署在同一组节点上,也可以部署在不同的节点上。
一、元数据服务层 Meta Service
Meta Service集群采用 leader / follower 架构,Leader 由集群中所有的 Meta Service 节点选出后对外提供服务,Followers 处于待命状态并从 leader 复制更新的数据,一旦 leader 节点 down 掉,再选举其中一个 follower 成为新的 leader。
Meta Service 不仅负责存储和提供图数据的 meta 信息,如 schema、partition 信息等,还同时负责指挥数据迁移及 leader 的变更等运维操作。
Metaservice 对应的进程是 nebula-metad ,其主要的功能有:
1. 用户管理:Nebula Graph 的用户体系包括 Goduser , Admin , User , Guest 四种。每种用户的操作权限不一。
2. 集群配置管理:支持上线、下线新的服务器。
3.图空间管理:增持增加、删除图空间,修改图空间配置(Raft副本数)
4. Schema 管理:通过 Metaservice 记录 Tag 和 Edge 的属性的各字段类型。支持的类型有:整型 int, 双精度类型 double, 时间数据类型 timestamp, 列表类型 list等; 多版本管理,支持增加、修改和删除 schema,并记录其版本号;TTL 管理,通过标识到期回收 time-to-live 字段,支持数据的自动删除和空间回收
MetaService 层为有状态的服务,其状态持久化方法与 Storage 层一样通过 KVStore 方式存储。
在 KVStore 的接口上,Nebula 也同时封装了一套 meta 相关的接口。Meta Service 不但提供了图 schema 的增删查改的功能,还提供了集群的管理功能以及用户鉴权相关的功能。Meta Service 支持单独部署,也支持使用多副本来保证数据的安全。
二、存储计算分离-存储层
Nebula 采用存储与计算分离的架构,计算层和存储层可以根据各自的情况弹性扩容、缩容、水平扩展。另外,存储计算分离使得 Storage Service 可以为多种类型的计算层或者计算引擎提供服务。当前 Query Service 是一个高优先级的计算层,而各种迭代计算框架会是另外一个计算层。
Shared-nothing 分布式存储
Storage Service 采用 shared-nothing 的分布式架构设计,每个存储节点都有多个本地 KV 存储实例作为物理存储。Nebula 采用多数派协议 Raft 来保证这些 KV 存储之间的一致性,在 KVStore 之上是图语义层,用于将图操作转换为下层 KV 操作。
图数据(点和边)是通过 Hash 的方式存储在不同 Partition 中。这里用的 Hash 函数实现很直接,即 vertex_id 取余 Partition 数。在 Nebula Graph 中,Partition 表示一个虚拟的数据集,这些 Partition 分布在所有的存储节点,分布信息存储在 Meta Service 中(因此所有的存储节点和计算节点都能获取到这个分布信息)。
Nebula 的 Storage 包含两个部分, 一是 meta 相关的存储, 我们称之为 Meta Service ,另一个是 data 相关的存储, 我们称之为 Storage Service。 这两个服务是两个独立的进程,数据也完全隔离,分别部署, 不过两者整体架构相差不大。
Storage Service共三层,最底层是 Store Engine,是一个单机版 local store engine,提供了对本地数据的 get / put / scan / delete 操作,相关的接口放在 KVStore / KVEngine.h 文件里面,可以根据自己的需求定制开发相关 local store plugin,目前 Nebula 提供了基于 RocksDB 实现的 Store Engine。在 local store engine 之上是 Consensus 层,实现了 Multi Group Raft,每一个 Partition 都对应了一组 Raft Group, Partition即数据分片。目前 Nebula 的分片策略采用了 静态 Hash 的方式,用户在创建 SPACE 时需指定 Partition 数,Partition 数量一旦设置便不可更改,一般来讲,Partition 数目要能满足业务将来的扩容需求。
在 Consensus 层上是 Storage interfaces,定义了一系列和图相关的 API。 这些 API 请求会在该层被翻译成一组针对相应 Partition 的 kv 操作。正是这一层的存在,使得我们的存储服务变成了真正的图存储, Nebula 没把 kv 作为一个服务单独提出,其最主要的原因便是图查询过程中会涉及到大量计算,这些计算往往需要使用图的 schema,而 kv 层是没有数据 schema 概念,这样设计会比较容易实现计算下推。
Schema & Partition
图存储的主要数据是点和边,但 Nebula 存储的数据是一张属性图,也就是说除了点和边以外,Nebula 还存储了它们对应的属性,以便更高效地使用属性过滤。对于点来说,使用不同的 Tag 表示不同类型的点,同一个 VertexID 可以关联多个 Tag,每一个 Tag 都有自己对应的属性。对应到 kv 存储里面,我们使用 vertexID + TagID 来表示 key, 把相关的属性编码后放在 value 里面,vertex key 的 format 如图所示:
Type : 1 个字节,用来表示 key 类型,当前的类型有 data, index, system 等
Part ID : 3 个字节,用来表示数据分片 Partition,此字段主要用于Partition 重新分布(balance) 时方便根据前缀扫描整个 Partition 数据
Vertex ID : 4 个字节, 用来表示点的 ID
Tag ID : 4 个字节, 用来表示关联的某个 tag
Timestamp : 8 个字节,对用户不可见,未来实现分布式事务 ( MVCC ) 时使用
在一个图中,每一条逻辑意义上的边,在 Nebula Graph 中会建模成两个独立的 key-value,分别称 out-key 和in-key。out-key 与这条边所对应的起点存储在同一partition ,in-key 与这条边所对应的终点存储在同一partition 。一般out-key 和 in-key 会分布在两个不同的 Partition 中。 两个点之间可能存在多种类型的边,Nebula 用 Edge Type 来表示边类型。而同一类型的边可能存在多条,如定义一个 edge type “转账”,用户 A 可能多次转账给 B, 所以 Nebula 又增加一个 Rank 字段来做区分,表示 A 到 B 之间多次转账记录。 Edge key 的 format 如图所示:
Type : 1 个字节,用来表示 key 的类型,当前的类型有 data, index, system 等。
Part ID : 3 个字节,用来表示数据分片 Partition,此字段主要用于Partition 重新分布(balance) 时方便根据前缀扫描整个 Partition 数据
Vertex ID : 4 个字节, 出边里用来表示源点的 ID, 入边里表示目标点的 ID。
Edge Type : 4 个字节, 用来表示这条边的类型,如果大于 0 表示出边,小于 0 表示入边。
Rank : 8 个字节,用来处理同一种类型的边存在多条的情况。用户可以根据自己的需求进行设置,这个字段可存放交易时间、交易流水号、或某个排序权重
Vertex ID : 4 个字节, 出边里用来表示目标点的 ID, 入边里表示源点的 ID。
Timestamp : 8 个字节,对用户不可见,未来实现分布式做事务的时候使用。
针对 Edge Type 的值,若如果大于 0 表示出边,则对应的 edge key format 如图;
若 Edge Type 的值小于 0,则对应的 edge key format 如图所示
对于点或边的属性信息,有对应的一组 kv pairs,Nebula 将它们编码后存在对应的 value 里。由于 Nebula 使用强类型 schema,所以在解码之前,需先去 Meta Service 中取具体的 schema 信息。另外,为了支持在线变更 schema,在编码属性时,会加入对应的 schema 版本信息。
数据的分片是对 Vertex ID 取模 ,同一个点的所有_出边_,_入边_以及这个点上所有关联的 _Tag 信息_都会被分到同一个 Partition,这种方式大大地提升了查询效率。 对于在线图查询来讲,最常见的操作便是从一个点开始向外 BFS(广度优先)拓展,于是拿一个点的出边或者入边是最基本的操作,而这个操作的性能也决定了整个遍历的性能。实际的场景中大部分情况都是属性图,并且实际中的 BFS 也需要进行大量的按照属性剪枝操作,Nebula 通过将属性与点边存在一起来保证整个操作的高效。
KVStore
Nebula 的需要高性能 pure kv,以 library 的形式提供:对于强 schema 的 Nebula 来讲,计算下推需要 schema 信息,而计算下推实现的好坏,是 Nebula 是否高效的关键;数据强一致:这是分布式系统决定的;使用 C++实现
Nebula也提供了整个KVStore 层的 plugin,直接将 Storage Service 搭建在第三方的 KVStore 上面,目前官方提供的是 HBase 的 plugin。
Nebula KVStore主要采用 RocksDB 作为本地的存储引擎,为了充分利用多硬盘的并发能力,Nebula 支持自己管理多块盘,用户只需配置多个不同的数据目录即可。分布式 KVStore 的管理由 Meta Service 来统一调度,它记录了所有 Partition 的分布情况及当前机器的状态,当用户增减机器时,只需要通过 console 输入相应的指令,Meta Service 便能够生成整个 balance plan 并执行, balance 的时机由用户自己控制。
为了方便对于 WAL (预写日志)进行定制,Nebula KVStore 实现了自己的 WAL 模块,每个 partition 都有自己的 WAL,这样在追数据时,不需要进行 wal split 操作, 更加高效。 另外,为了实现一些特殊的操作,专门定义 Command Log 这个类别,这些 log 只为了使用 Raft 来通知所有 replica 执行某一个特定操作,并没有真正的数据。除了 Command Log 外,Nebula 还提供了一类日志来实现针对某个 Partition 的 atomic operation,例如 CAS,read-modify-write, 充分利用了Raft 串行的特性。
关于多图空间(space)的支持:一个 Nebula KVStore 集群可以支持多个 space,每个 space 可设置自己的 partition 数和 replica 数。不同 space 在物理上是完全隔离的,而且在同一个集群上的不同 space 可支持不同的 store engine 及分片策略。
Raft
作为一个分布式系统,KVStore 的 replication,scale out 等功能需 Raft 的支持。
1.Multi Raft Group
由于 Raft 的日志不允许空洞,几乎所有的实现都会采用 Multi Raft Group 来缓解这个问题,因此 partition 的数目几乎决定了整个 Raft Group 的性能。但这也并不是说 Partition 的数目越多越好:每一个 Raft Group 内部都要存储一系列的状态信息,并且每一个 Raft Group 有自己的 WAL 文件,因此 Partition 数目太多会增加开销。此外,当 Partition 太多时, 如果负载没有足够高,batch 操作是没有意义的。如一个有 1w tps 的线上系统单机,它的单机 partition 的数目超过 1w,可能每个 Partition 每秒的 tps 只有 1,这样 batch 操作就失去了意义,还增加了 CPU 开销。 实现 Multi Raft Group 的最关键之处有两点,第一是共享 Transport 层,因为每一个 Raft Group 内部都需要向对应的 peer 发送消息,如果不能共享 Transport 层,连接的开销巨大;第二是线程模型,Mutli Raft Group 一定要共享一组线程池,否则会造成系统的线程数目过多,导致大量的 context switch 开销。
2.Batch
对于每个 Partition来说,由于串行写 WAL,为了提高吞吐,做 batch 是十分必要的。 Nebula 利用每个 part 串行的特点,做了一些特殊类型的 WAL。如Nebula 利用 WAL 实现了无锁的 CAS 操作,而每个 CAS 操作需要之前的 WAL 全部 commit 之后才能执行,所以对于一个 batch,如果中间夹杂了几条 CAS 类型的 WAL, 我们还需要把这个 batch 分成粒度更小的几个 group,group 之间保证串行。还有command 类型的 WAL 需要它后面的 WAL 在其 commit 之后才能执行。
3.Learner
Learner 这个角色的存在主要是为了 应对扩容 时,新机器需要"追"相当长一段时间的数据,而这段时间有可能会发生意外。如果直接以 follower 的身份开始追数据,就会使得整个集群的 HA 能力下降。 Nebula 里面 learner 的实现就是采用上面提到的 command wal,leader 在写 wal 时如果碰到 add learner 的 command, 就会将 learner 加入自己的 peers,并把它标记为 learner,这样在统计多数派的时候,就不会算上 learner,但是日志还是会照常发送给它们,learner 也不会主动发起选举。
4.Transfer Leadership
该操作对于 balance 来讲至关重要,当我们把某个 Paritition 从一台机器挪到另一台机器时,首先便会检查 source 是不是 leader,如果是的话,需要先把他挪到另外的 peer 上面;在搬迁数据完毕之后,通常还要把 leader 进行一次 balance,这样每台机器承担的负载也能保证均衡。 实现 transfer leadership, 需要注意的是 leader 放弃自己的 leadership,和 follower 开始进行 leader election 的时机。对于 leader 来讲,当 transfer leadership command 在 commit 的时候,它放弃 leadership;而对于 follower 来讲,当收到此 command 的时候就要开始进行 leader election, 这套实现要和 Raft 本身的 leader election 走一套路径,否则很容易出现一些难以处理的 corner case。
5.Membership change
为了避免脑裂,当一个 Raft Group 的成员发生变化时,需要有一个中间状态, 这个状态下 old group 的多数派与 new group 的多数派总是有 overlap,这样就防止了 old group 或者新 group 单方面做出决定,这就是joint consensus 。为了更加简化,Diego Ongaro 在自己的博士论文中提出每次增减一个 peer 的方式,以保证 old group 的多数派总是与 new group 的多数派有 overlap。 Nebula 的实现也采用了这个方式,只不过 add member 与 remove member 的实现有所区别。
Storage Service
在 KVStore 的接口上Nebula 封装有图语义接口,主要的接口如下:
getNeighbors : 查询一批点的出边或者入边,返回边以及对应的属性,并且需要支持条件过滤;
Insert vertex/edge : 插入一点或者边及其属性;
getProps : 获取一个点或者一条边的属性;
这一层会将图语义接口转化成 kv 操作。为了提高遍历的性能,还要做并发操作。
在 Nebula Graph 中存储层对应进程是 nebula-storaged ,其核心为基于 Raft(用来管理日志复制的一致性算法) 协议的分布式 Key-value Storage 。目前支持的主要存储引擎为「Rocksdb」和「HBase」。Raft 协议通过 leader/follower 的方式,来保持数据之间的一致性。Nebula Storage 主要增加了以下功能和优化:
(1). Parallel Raft:允许多台机器上的相同 partiton-id 组成一个 Raft group 。通过多组 Raft group 实现并发操作。
(2). Write Path & batch:Raft 协议的多机器间同步依赖于日志 id 顺序性,这样的吞吐量 throughput 较低。通过批量和乱序提交的方式可以实现更高的吞吐量。
(3). Learner:基于异步复制的 learner。当集群中增加新的机器时,可以将其先标记为 learner,并异步从 leader/follower 拉取数据。当该 learner 追上 leader 后,再标记为 follower,参与 Raft 协议。
(4). Load-balance:对于访问压力较大的机器,将其所服务的 partition 迁移到较冷的机器上,以实现更好的负载均衡。
三、无状态计算层Graph Service
计算层每个计算节点都运行着一个无状态的查询计算引擎,节点彼此间无任何通信关系。计算节点仅从 Meta Service 读取 meta 信息,以及和 Storage Service 进行交互。这样设计使得计算层集群更容易使用 K8s 管理或部署在云上。计算层的负载均衡有两种形式,最常见的方式是在计算层上加一个负载均衡(balance),第二种方法是将计算层所有节点的 IP 地址配置在客户端中,这样客户端可以随机选取计算节点进行连接。 每个查询计算引擎都能接收客户端的请求,解析查询语句,生成抽象语法树(AST)并将 AST 传递给执行计划器和优化器,最后再交由执行器执行。
在 Nebula 中,Query Engine 是用来处理 Nebula 查询语言语句(nGQL)。Query Engine 架构图和现代 SQL 的执行引擎类似。
Session Manager
Nebula 权限管理采用基于角色的权限控制(Role Based Access Control)。客户端第一次连接到 Query Engine 时需作认证,当认证成功之后 Query Engine 会创建一个新 session,并将该 session ID 返回给客户端。所有的 session 统一由 Session Manger 管理。session 会记录当前 graph space 信息及对该 space 的权限。session 还会记录一些会话相关的配置信息,并临时保存同一 session 内的跨多个请求的一些信息。
客户端连接结束之后 session 会关闭,或者如果长时间没通信会切为空闲状态。这个空闲时长是可以配置的。
客户端的每个请求都必须带上此 session ID,否则 Query Engine 会拒绝此请求。
Storage Engine 不管理 session,Query Engine 在访问存储引擎时,会带上 session 信息。
Parser
Query Engine 解析来自客户端的 nGQL 语句,分析器(parser)主要基于著名的 flex / bison 工具集。设计上,nGQL 的语法非常接近 SQL。Parser 构建产出的抽象语法树(Abstrac Syntax Tree,简称 AST)会交给下一模块:Execution Planner。
Execution Planner
执行计划器(Execution Planner)负责将抽象树 AST 解析成一系列执行动作 action(可执行计划,最小可执行单元)。例如,典型的 action 可以是获取某个节点的所有邻节点,或者获得某条边的属性,或基于特定过滤条件筛选节点或边。当抽象树 AST 被转换成执行计划时,所有 ID 信息会被抽取出来以便执行计划的复用。这些 ID 信息会放置在当前请求 context 中,context 也会保存变量和中间结果。
Optimization
经由 Execution Planner 产生的执行计划会交给执行优化框架 Optimization,优化框架中注册有多个 Optimizer。Optimizer 会依次被调用对执行计划进行优化,这样每个 Optimizer都有机会修改(优化)执行计划。最后,优化过的执行计划可能和原始执行计划完全不一样,但是优化后的执行结果必须和原始执行计划的结果一样的。
Execution
Query Engine 最后一步是去执行优化后的执行计划,这步是执行框架(Execution Framework)完成的。执行层的每个执行器一次只处理一个执行计划,计划中的 action 会挨个一一执行。执行器也会一些有针对性的局部优化,比如:决定是否并发执行。针对不同的 action所需数据和信息,执行器需要经由 meta service 与storage engine的客户端与他们通信。
计算层对应的进程是 nebula-graphd ,它由完全对等无状态无关联的计算节点组成,计算节点之间相互无通信。Query Engine层的主要功能,是解析客户端发送 nGQL 文本,通过词法解析 Lexer 和语法解析 Parser 生成执行计划,并通过优化后将执行计划交由执行引擎,执行引擎通过 MetaService 获取图点和边的 schema,并通过存储引擎层获取点和边的数据。
Query Engine层的主要优化有:
异步和并发执行:由于 IO 和网络均为长时延操作,需采用异步及并发操作。此外,为避免单个长 query 影响后续 query,Query Engine 为每个 query 设置单独的资源池以保证服务质量 QoS。
计算下沉:为避免存储层将过多数据回传到计算层占用宝贵的带宽,条件过滤 where 等算子会随查询条件一同下发到存储层节点。
执行计划优化:包括执行计划缓存和上下文无关语句并发执行。
客户端 API & Console
Nebula Graph 提供 C++、Java、Golang 三种语言的客户端,与服务器之间的通信方式为 RPC,采用的通信协议为 Facebook-Thrift。
四、Nebula Graph的数据模型
Nebula Graph 采用有向属性图 DirectedPropertyGraph来建模,逻辑上图由两种图元素构成:顶点和边。
顶点 Vertex
顶点由标签 tag 和对应 tag 的属性组构成, tag 代表顶点的类型,属性组代表 tag 拥有的一种或多种属性。一个顶点必须至少有一种类型(标签),也可以有多种类型。每种标签有一组相对应的属性,我们称之为 schema 。Nebula Graph 是一种强 schema 的数据库,属性的名称和数据类型都是在数据写入前确定的。
边 Edge
边由类型和边属性构成, Nebula Graph 中边均是有向边,有向边表明一个顶点( 起点 src )指向另一个顶点( 终点 dst )的关联关系。边类型称为 edgetype ,每条边只有一种edgetype ,每种 edgetype 相应定义这种边上属性的 schema 。
图分割 GraphPartition
由于超大规模关系网络的节点数量高达百亿到千亿,而边的数量更会高达万亿,即使仅存储点和边两者也远大于一般服务器的容量。因此需要有方法将图元素切割,并存储在不同逻辑分片 partition 上。Nebula Graph 采用边分割的方式,默认的分片策略为哈希散列,partition 数量为静态设置并不可更改。
数据模型 DataModel
每个顶点被建模为一个 key-value ,根据其 vertexID(或简称 vid)哈希散列后,存储到对应的 partition 上。
一条逻辑意义上的边,在 Nebula Graph 中将会被建模为两个独立的 key-value ,分别称为 out-key 和 in-key 。out-key 与这条边所对应的起点存储在同一个 partition 上,in-key 与这条边所对应的终点存储在同一个 partition 上。