优点
- 增加了column family,这样有利于多个不相关的数据集存储在同一个db中,因为不同column family的数据是存储在不同的sst和memtable中,所以一定程度上起到了隔离的作用。
- 采用了多线程同时进行compaction的方法,优化了compact的速度。
- 增加了merge operator,优化了modify的效率
- 将flush和compaction分开不同的线程池,能有效的加快flush,防止stall。
- 增加了对write ahead log(WAL)的特殊管理机制,这样就能方便管理WAL文件,因为WAL是binlog文件。
- RocksDB典型的做法是Level 0-2不压缩,最后一层使用zlib(慢,压缩比很高),而其它各层采用snappy
rocksdb的文件类型
主要有以下几种类型sst文件,CURRENT文件,manifest文件,log文件,LOG文件和LOCK文件
- sst文件存储的是落地的数据;
- CURRENT文件存储的是当前最新的是哪个manifest文件;
- manifest文件存储的是Version的变化;
- log文件是rocksdb的write ahead log,就是在写db之前写的数据日志文件;
- LOG文件是一些日志信息,是供调试用的;
- LOCK是打开db锁,只允许同时有一个进程打开db。
配置信息(TODO)
ColumnFamilyOptions
这些option都是column family相关的,可以对不同的column family赋不同的值。
- inplace_update_support: 字面含义是是否支持在原位置更新,如果支持的话,那么原来的数据就被擦除了,所以snapshot和iterator保留当时的数据的逻辑就没法实现了
- num_levels: 记录的是version的level的数目,默认是7,即0~6
- target_file_size_base: level1的sst文件的大小,默认为2MB
- target_file_size_multiplier: level1以上的sst文件大小,乘数因子默认是1,即所有level的文件大小都是2MB
- level0的文件大小是由
write_buffer_size
决定的,level1的文件大小是由target_file_size_base
决定的,level2及以上,size = target_file_size_base * (target_file_size_multiplier ^ (L - 1))
- level0的文件大小是由
- max_bytes_for_level_base: level1的sst总的文件总和大小,默认是10MB
- max_bytes_for_level_multiplier: level2及以上的level的sst文件总和大小的乘数因子,默认是10,
- level0的sst文件总和大小是
level0_stop_writes_trigger * write_buffer_size
,因为level0的文件数目达到level0_stop_writes_trigger时候就会停止write。 - level1及以上的文件总和大小是max_bytes_for_level_base * (max_bytes_for_level_multiplier ^ (L - 1)),默认的level0是4MB * 24 = 96MB,level1是10MB,level2是100MB,level3是1G,level4是10G。。
- level0的sst文件总和大小是
RocksDB Flush
Flush是指将memtable的数据导入到sst中,变成持久化存储,就不怕数据丢失了。
触发Flush的代码入口:
Status DBImpl::ScheduleFlushes(WriteContext* context) {
autovector<ColumnFamilyData*> cfds;
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds);
for (auto cfd : cfds) {
cfd->Ref();
}
flush_scheduler_.Clear();
} else {
ColumnFamilyData* tmp_cfd;
while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
cfds.push_back(tmp_cfd);
}
MaybeFlushStatsCF(&cfds);
}
Status status;
for (auto& cfd : cfds) {
if (!cfd->mem()->IsEmpty()) {
status = SwitchMemtable(cfd, context);
}
if (cfd->Unref()) {
delete cfd;
cfd = nullptr;
}
if (!status.ok()) {
break;
}
}
if (status.ok()) {
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
}
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
return status;
- 首先在memtable的add的时候,会检测是否memtable的大小达到了max write buffer,如果是就将should_flush_置为true(
CheckMemtableFull
还有其他情况触发),并会在WriteBatch的Handler里面调用CheckMemtableFull,将当前column family加入flush_scheduler;-
CheckMemtableFull
调用的FlushScheduler::ScheduleWork
方法只是将cfd添加到checking_set_
队列中,并未真正地执行Flush调度;
-
- 在Write的时候,调用ScheduleFlushes,将需要flush的column family的memtable切换一个新的,同时将原来的memtable加入cfd的imm中;
- 由于真正的Flush过程是在另一个线程完成的,所以这个地方并不会block写过程;
-
Write
中调用PreprocessWrite
做些预先处理的工作; - 如果发现
checking_set_
不为空,会调用DBImpl::ScheduleFlushes
方法,然后调用SwitchMemtable
切换新的memtable;DBImpl::SwitchMemtable
执行流程:- 如果开启
two_write_queues_
: 等待没有并发的wal写入线程; -
WriteRecoverableState
在memtable中写入recoverable_state状态; - 如果开启
enable_pipelined_write
: 等待所有的memtable写入线程完毕; - 如果需要创建新的wal,则调用
CreateWAL
创建wal writer; - 调用
cfd->ConstructNewMemtable
,创建新的memtable; -
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_)
,将原来的memtable加入到imm中;
- 如果开启
- 当mem切换imm切换成功,会触发
MaybeScheduleFlushOrCompaction
,尝试flush或者compaction;- 当然也有其他case触发flush/compaction: 如果这个column family data的imm数量大于min_write_buffer_number_to_merge,并启动一个新的线程调用BGWorkFlush;
- BGWorkFlush->BackgroundCallFlush->BackgroundFlush->FlushJob
- FlushJob::PickMemTable选择需要Flush的imm
- 由于cfd中可能包含多个imm,从cfd获取一个可以进行flush的memtable的list:待合并、flush的imm结合;
- 从memtable列表中获取第一个memtable,使用其edit结构来保存本次flush的元信息: 该次flush的版本信息通过第一个imm设定;
- 调用version_set的NewFileNumber接口为新的文件生成一个filenumber(同时可以指定对应level的路径, level=0)
- FlushJob::Run, 执行flush逻辑
- WriteLevel0Table: 将imm写入level=0的sst文件中
- 遍历待合并的Imm集合:
- 待flush的数据:构造InternalIterator迭代器数组;
- 待删除的数据:构造FragmentedRangeTombstoneIterator迭代器数组;
- 基于InternalIterator构造NewMergingIterator归并迭代器,基于最小堆实现多路归并算法;
- BuildTable:将数据写入sst中:
- TableFileName: 构造flush的文件名;
- NewWritableFile: 创建新的文件;
- WritableFileWriter: 构造writer;
- NewTableBuilder: 构建table builder;
- CompactionIterator: 构建合并迭代器;
- 遍历迭代器,调用
BlockBasedTableBuilder.Add
方法逐一添加k/v数据,中间可能触发flush;
- 处理完成 之后如果output_file_directory不为空则同步该目录(output_file_directory_->Fsync())
- 调用edit_->AddFile,将生成的文件添加到L0
- 记录本次Flush的状态
- 遍历待合并的Imm集合:
- WriteLevel0Table: 将imm写入level=0的sst文件中
RocksDB Compaction
- 通过minor compaction,内存中的数据不断地写入的磁盘,保证有足够的内存来应对新的写入;
- 而通过major compaction,多层之间的SST文件的重复数据和无用的数据可以迅速减少,进而减少sst文件占用的磁盘空间。
Compaction的触发条件是两类:文件个数和文件大小。
- 对于level0,触发条件是:
- sst文件个数,通过参数level0_file_num_compaction_trigger控制;
- score通过sst文件数目与level0_file_num_compaction_trigger的比值得到。
- level1-levelN触发条件是:
- sst文件的大小,通过参数max_bytes_for_level_base和max_bytes_for_level_multiplier来控制每一层最大的容量;
- score是本层当前的总容量与能存放的最大容量的比值
Compaction的主要流程如下:
- 首先找score最高的level,如果level的score>1,则选择从这个level进行compaction
- 根据一定的策略,从level中选择一个sst文件进行compact,对于level0,由于sst文件之间(minkey,maxkey)有重叠,所以可能有多个。
- 从level中选出的文件,我们能计算出(minkey,maxkey)
- 从level+1中选出与(minkey,maxkey)有重叠的sst文件
- 多个sst文件进行归并排序,合并写出到sst文件
- 根据压缩策略,对写出的sst文件进行压缩
- 合并结束后,利用VersionEdit更新VersionSet,更新统计信息
触发Compaction的方式:
- DBImpl::RunManualCompaction: 手动触发Compaction
- 判断触发MannulCompaction条件、变量;
- 确保没有非mannul compaction执行,这样的话mannual compaction可以执行任意range的compaction;
- 调用
BGWorkCompaction
线程开启调度;
- 判断触发MannulCompaction条件、变量;
- 自动Compaction:
- DBImpl::MaybeScheduleFlushOrCompaction: 在每次触发mem的flush的时,会判定是否进行flush/compaction
- DBImpl::BackgroundCallFlush: 包含了mem的flush、compaction的判定执行逻辑;
- DBImpl::MaybeScheduleFlushOrCompaction
- 调用
BGWorkCompaction
线程开启调度
- 调用
-
BGWorkCompaction
的执行逻辑:可以发现不论是手动、自动触发的模式,的最终都会调用Compaction线程进行处理:- DBImpl::BackgroundCallCompaction
- DBImpl::BackgroundCompaction(真正的执行逻辑,这个函数巨长)
- 如果是mannul compaction:
- 调用
EnoughRoomForCompaction
判定是否有足够的Compaction空间,没有空间的话直接返回CompactionTooLarge
异常;
- 调用
- 如果是auto compaction:
- 调用PickCompactionFromQueue,从queue选择需要执行的cfd,如果为空,直接返回;
- 调用
EnoughRoomForCompaction
判定是否有足够的Compaction空间,没有空间的话,更新统计信息不返回异常;
- 在进行完准备工作之后,判定需要合并的compcation(c)不为空的话,
- 如果c为deletion_compaction:
- 删除c执行的fd和edit信息;
- 调用VersionSet::LogAndApply进行更新manifest操作;
- 调用
DBImpl::InstallSuperVersionAndScheduleWork
更新SuperVersion;
- 如果c为IsTrivialMove:
- 类似于上述操作,先进性fileMeta变更;
- 然后调用VersionSet::LogAndApply进行更新manifest操作;
- 再调用
DBImpl::InstallSuperVersionAndScheduleWork
更新SuperVersion;
- 如果c是BottomCompaction(最开始引入是为了universal-compaction,后来也对level-compaction进行适配,主要用于长时间(long running)合并,以避免同short-live上层合并逻辑的冲突):
- 调用
DBImpl::BGWorkBottomCompaction
执行;
- 调用
- 否则,执行通用Compaction逻辑:
- 构造并提交
CompactionJob
;- Prepare:
- 构造边界值和统计信息;
- Run:
- 构造合并迭代器;
- Install:
- 调用VersionSet::LogAndApply变更edit/fileMeta信息;
- Prepare:
- 构造并提交
- 如果c为deletion_compaction:
- 如果是mannul compaction:
在ColumnFamilyData
构造信息中会根据配置信息初始化,如下变量用于compaction的统计信息更新、并确定下一次compaction的判断:
std::unique_ptr<CompactionPicker> compaction_picker_;
CompactionPicker
提供的主要接口有:
- NeedsCompaction: 是否进行合并;
- MaxOutputLevel: 最大output level;
- PickCompaction: 根据level和inputs文件产生新的compaction;
- CompactRange: 根据在指定level的
[begin,end]
信息构造compaction信息;
在RocksDB中,compaction的CompactionPicker
实现有如下几种:
enum CompactionStyle : char {
// level based compaction style
kCompactionStyleLevel = 0x0,
// Universal compaction style
// Not supported in ROCKSDB_LITE.
kCompactionStyleUniversal = 0x1,
// FIFO compaction style
// Not supported in ROCKSDB_LITE
kCompactionStyleFIFO = 0x2,
// Disable background compaction. Compaction jobs are submitted
// via CompactFiles().
// Not supported in ROCKSDB_LITE
kCompactionStyleNone = 0x3,
};
Level Compaction
某个level的sst文件与level+1中存在重叠的sst文件进行合并,然后将合并后的文件写入到level+1层的过程。
- 通过判断每个level的score是否大于1,确定level是否需要compact
- 默认是选择文件size较大,包含delete记录较多的sst文件,这种文件尽快合并有利于缩小空间。
- 每次会从level中选取一个sst文件与下层compact,但由于level0中可能会有多个sst文件存在重叠的范围,因此一次compaction可能有多个level0的sst文件参与。
在Level-Based的Compaction中,决定从一个level到下一个level进行合并的方法有(参考VersionStorageInfo::UpdateFilesByCompactionPri
方法):
- kByCompensatedSize: 根据sst文件的
compensated_file_size
补偿文件大小排序选择;-
compensated_file_size
大致可以理解为:file_meta->fd.GetFileSize() + (file_meta->num_deletions * 2 - file_meta->num_entries) * average_value_size * kDeletionWeightOnCompaction
,同文件大小与删除文件数量有关系(参考VersionStorageInfo::ComputeCompensatedSizes
);
-
- kOldestLargestSeqFirst: 根据sst文件的largest_seqno序列号排序选择(大者优先);
- kOldestSmallestSeqFirst: 根据sst文件的smallest_seqno序列号排序选择(小者优先);
- kMinOverlappingRatio: 根据sst文件的overlapping大小/file_size排序;
Universal Compaction
相对于level compaction,Univeral compaction由于每一次合并的文件较多,相对于level compaction的多层合并,写放大较小,付出的代价是空间放大较大。
- Univeral模式中,所有的sst文件都可能存在重叠的key范围。对于R1,R2,R3,...,Rn,每个R是一个sst文件,R1中包含了最新的数据,而Rn包含了最老的数据;
- 合并的前提条件是sst文件数目大于level0_file_num_compaction_trigger,如果没有达到这个阀值,则不会触发合并。在满足前置条件的情况下,按优先级顺序触发以下合并。
- 如果空间放大超过一定的比例,则所有sst进行一次compaction,所谓的full compaction,通过参数max_size_amplification_percent控制。
- 如果前size(R1)小于size(R2)在一定比例,默认1%,则与R1与R2一起进行compaction,如果(R1+R2)*(100+ratio)%100<R3,则将R3也加入到compaction任务中,依次顺序加入sst文件
- 如果第1和第2种情况都没有compaction,则强制选择前N个文件进行合并。
FIFO Compaction
FIFO顾名思义就是先进先出,这种模式周期性地删除旧数据。在FIFO模式下,所有文件都在level0,当sst文件总大小超过阀值max_table_files_size,则删除最老的sst文件。
参考
- RocksDB简介://www.greatytc.com/p/8d09a7190dda
- RocksDB Version管理概述: https://www.cnblogs.com/coguin/p/11405082.html
- https://segmentfault.com/a/1190000018967058
- 这里讲了memtable并发写入的过程,利用了InlineSkipList,它是支持多读多写的,节点插入的时候会使用 每层CAS 判断节点的 next域是否发生了改变,这个 CAS 操作使用默认的memory_order_seq_cst:
- RocksDB线程管理: //www.greatytc.com/p/abf15e5e306b
- 知乎RocksDB学习好文章: https://www.zhihu.com/question/270732348/answer/356254676
- RocksDB系列文章: https://www.cnblogs.com/cchust/category/895428.html
- RocksDB Flush: //www.greatytc.com/p/38a38134491b
- MemTable存储结构:
- RocksDB BlockBasedTable:
- RocksDB PrefixSeek:
- RocksDB Manifest:
- RocksDB BloomFilter:
- RocksDB Compaction:
- RocksDB 笔记: //www.greatytc.com/p/0d4bea498a91
- RocksDB 写入流程:
- RocksDB CuckooTable: 针对点差进行优化了的format格式
- RocksDB SnapShot: