转载请注明原文地址//www.greatytc.com/p/dd5c7c222703
TiCDC通过解析TiKV的raft日志来实现抽取数据库的变化数据,也就是大家所熟知的Change Data Capture(CDC)
。如果没有TiKV侧的配合,是不能完成这一操作的,本文记录的就是TiKV如何把变化数据推送给TiCDC的。
Raft coprocessor
TiKV通过coprocessor扩展,实现了类似存储过程的高级功能。对于coprocessor不需要了解太多,如有兴趣请参阅:TiKV 源码解析系列文章(十四)Coprocessor 概览
本文所述的raft coprocessor作用于raft日志apply的时候,本质上是3个钩子函数,在TiKV准备apply raft日志之前、apply之后、raft日志成功写入存储层这3个阶段,插入一段逻辑,这3段逻辑会将这些raft日志做一系列额外的处理发给TiCDC。
如果不了解TiKV的raft工作原理,建议先通过官方经典博文了解一下:TiKV 是如何存取数据的
raft coprocessor在TiKV进程启动时就会被加载,而以上3段逻辑只有在TiCDC“订阅”了某个region之后才会真正开始工作。apply raft日志之前、apply之后只是把raft日志给buffer起来,raft成功写入存储层之后,做以下处理:
- 对rocksdb做快照
- 在“推流”阶段,对buffer起来的这一批日志进行解析、过滤,发往TiCDC
我们先不急着追究为什么要做快照,本文后面解析“获取历史值”时,会回顾这一步。
可以看到raft coprocessor只参与了“推流”。“推流”这个说法来自于TiCDC的设计文档,因为raft日志经过解析之后,由TiKV通过推的方式,源源不断地流入TiCDC,即所谓“推流”。推流贯穿于整个TiCDC连接生命周期,是两者间通信的主要阶段。
如何从raft日志中解析出事务
我们先回顾一下,TiKV是如何将事务commit和rollback的?
Percolator提供三个 column family (CF),Lock,Data 和 Write,当写入一个 key-value 的时候,会将这个 key 的 lock 放到 Lock CF 里面,会将实际的 value 放到 Data CF 里面,如果这次写入 commit 成功,则会将对应的 commit 信息放到入 Write CF 里面。
Key 在 Data CF 和 Write CF 里面存放的时候,会把对应的时间戳给加到 Key 的后面。在 Data CF 里面,添加的是 startTS,而在 Write CF 里面,则是 commitTS。
TiKV的实现中,也含有Lock列和Write列,论文中Data列对应的是TiKV的default列。
如果一个raft PUT操作中,操作的是Lock列的数据,Lock的类型为Put
或Delete
(Lock总共4种类型: Put
代表insert或update操作, Delete
代表delete操作, Lock
, Pessimistic
,后面2种与TiCDC无关),那么这是一条Prewrite记录,这条日志在大部分场景下只含有key值(表的主键值),没有表的其它字段值,因此在大部分场景下,只有等到后续相同key值的default列数据到来时,才能拼成一条完整的操作记录。
如果一个raft PUT操作中,操作的是Write列的数据,Write的类型为Put
或Delete
(Write也有4种类型: Put
代表insert或update操作提交成功, Delete
代表delete操作提交成功, Rollback
表示事务回滚, Lock
与TiCDC无关),那么这是一条Commit或Rollback记录,在大部分场景下,和Lock列一样也是只有key没有其它字段的,需要default列来拼。TiCDC收到这种记录后,会按事务的开始时间startTs
来收集其对应所有Prewrite记录,进而组装成一个完整的事务。
raft PUT最后一种就是操作default列的数据了,上文已经说明,default列含有一条表记录的全字段,但是没有操作类型等元信息,需要和Lock或Write拼成完整的记录。
至此,我们大概了解了raft日志是如何被解析的,也就了解了TiKV侧的CDC接口的核心工作原理。但是事实并不简单,没有到此为止。
Region的订阅
本文的“Raft coprocessor”小节中提到,只有在TiCDC“订阅”了某个region之后才会真正开始工作。为什么需要订阅?
第一,TiKV采用Multi-raft的设计,顾名思义,多组raft,不同raft组之间通信是以region为单元并行的,对于每一组raft,就需要有1个raft coprocessor注册在该组leader节点之上进行监听。如果leaer节点down掉,leader会切换到健康节点,那么coprocessor也要相应地切过去。
第二,region它不是固定不变的。region是一段key值从小到大的区间,它可能分裂为多个region,也可能合并为一个。
事实上,region发生改变时,TiKV会反馈一个EpochNotMatch或RegionNotFound异常;leader发生改变时,会反馈一个NotLeader异常。TiCDC收到异常后,需要去PD重新获取region信息,对新region发起订阅。
增量扫(Incremental scan)
本文的“Raft coprocessor”小节中提到,推流贯穿于整个TiCDC连接生命周期,是两者间通信的主要阶段。但是除了推流之外,TiCDC启动时,还有与推流并行执行的“增量扫”阶段,为何会有这一阶段?
首先回顾一下上文“如何从raft日志中解析出事务”。
上文中提到,TiCDC收到Commit记录后,会按事务的开始时间startTs来收集其对应所有Prewrite记录,进而组装成一个完整的事务。试想一下这个场景:TiCDC订阅region的这个时间点之前,有事务未提交,订阅之后才提交。如果只有推流的话,那么这个时间点之前的部分Prewrite记录就没有被TiCDC接收到,是不能拼成一个完整的事务的。所以,应该要先扫下还没提交的数据,把这部分数据发出去。
其次,如果TiCDC down掉,那么它对region的订阅也会失效。TiKV可不会因为你没有订阅这个region就不处理raft了,明显不可能。那么TiCDC重新连上来的时候,由于coprocessor是apply raft日志时的钩子,down掉的期间这些日志已经被处理过,不会再处理了,从TiCDC这边看就是不会再收到这些日志,这明显是不行的。所以,这些缺失的日志是无法通过coprocessor获取的,需要直接从存储层获取,扫描从down掉的时间点(checkpointTs)到当前时间点的所有增量数据(注意delete对于KV存储也是增量数据),也就是所谓的“增量扫”。
增量扫具体的做法非常简单,就是先对rocksdb做快照,确保数据固定,再对快照执行前缀搜索遍历,解析出符合条件的Prewrite和Commit记录,解析方法和解析raft日志的方法大同小异。
由于region是TiKV的逻辑概念,rocksdb内部并没有所谓的region划分,故执行遍历的时候,如果region较多可能会消耗一些额外的资源。
增量扫完成后,会给TiCDC一个Initialized通知。
ResolvedTs
对CDC技术接触较多的同事都知道,我们从db抽取日志时,除了普通的一行一行数据外,最好有个心跳消息,通过心跳我们可以知道,db没把数据推给我,不是因为连接断了,而是因为确实没有数据变化,这段时间没有insert update delete等操作。
ResolvedTs确实有这种作用,不过它的作用可不止心跳。TiCDC可是严格按照ResolvedTs来控制它的数据加工管道的,这不在本文的讨论范围,我会在其它文章中详细说明。
ResolvedTs:为了数据还原的一致性,只有当所有 region 都保证在某个 ts 之前的所有数据都已经被 TiCDC 获取到,TiCDC 才会对 ts 前的 kv change event 进行排序并向下游进行事务还原。 因此对于一定时间没有任何数据写入的 region,需要提供某种机制推进该 ts,以降低 TiCDC 还原事务的延迟。ResolvedTs 类型 event 就是解决这个问题。
这段设计文档对于resolvedTs的作用已经描述得非常清楚了,在这里我就只说一下resolvedTs在TiKV侧是怎么生成的。
TiKV启动后,对于CDC这块,会周期性地去PD拿tso(不清楚tso的可以理解为分布式系统的当前时间戳),之后走一次raft确保本节点的region信息正确无误,之后更新本节点每个region的minTs
为这个时间戳。
TiKV会追踪每个事务的开始时间startTs和提交时间commitTs,也就是对于每个region,追踪raft日志中Prewrite记录的Lock列和Commit记录的Write列。如果没有未完成的事务,那么resolvedTs就是minTs
;如果有些事务未提交,用最早开始的事务的startTs和minTs
比较,取更小者。
(跨region的事务有待研究)
获取历史值
做CDC有时候我们需要获取某一行记录的历史值,如修改前是张三,修改后是李四,我想获取“张三”这个值。
对于增量扫阶段,这个需求十分简单,因为KV存储的特性,key是按字节从小到大排好序的,TiKV设计的key中又包含时间戳,所以历史值一定是排在当前值的前面。在遍历的时候,只需要将指针往前移动一格或几格,就能找到历史值,不需要点查。
推流阶段,就不可避免要去点查了,因此性能相对增量扫阶段肯定是有下降的,查询之前先做快照的目的就是防止旧值被GC回收,当然不可能每条记录都做一次快照,所以如本文开头所说,只有raft日志成功写入存储层时,才做。