RocksDB——Put
涉及的数据结构概览
相关class以及对应的源文件
DB | db/db.h |
---|---|
DB | include/rocksdb/db.h |
DB_impl | db/db_impl.cc, db/db_impl_write.cc |
WriteBatch | include/rocksdb/write_batch.h, db/write_batch.cc |
WriteThread | db/write_thread.h, db/write_thread.cc |
WriteBatchInternal | db/write_batch_internal.h, db/write_batch.cc |
WriteOptions | include/rocksdb/options.h |
MemtableInserter | db/write_batch.cc |
调用关系图
默认配置下的put流程
外部调用流程
RocksDB从调用Put接口到真正开始执行Put操作之间还有几层函数调用,在这几层函数调用中主要进行数据的封装操作,最后进入DB_Impl::WriteImpl执行写操作过程。
首先是外部的Put接口,RocksDB提供了两个Put接口,分别是指定了column_family以及没有指定column_family的接口,其中没有指定column_family的Put是调用指定column_family的Put并指定默认的column family(default)封装实现的。
DBImpl的Put调用DB的Put实现,两者定义一样,参数直接传递
DB的Put会将传入的column_family、key以及value封装到WriteBatch中,然后调用Write函数,传入WriteOption以及WriteBatch
Write函数直接调用WriteImple函数进入写流程
WriteImpl
传入WriteImpl的参数为:
const WriteOptions& write_options
WriteBatch* my_batch
WriteCallback* callback
uint64_t* log_used
uint64_t log_ref
bool disable_memtable
uint64_t* seq_used
size_t batch_cnt
PreReleaseCallback* pre_release_callback
(由Write调用的时候只提供了WriteOptions、WriteBatch*以及WriteCallback*(TODO:关于传入参数的个数)
初步处理
进入WriteImpl之后首先判断几个设置参数并根据设置参数执行不同的操作:
- tracer_:如果为true,则调用tracer_->Write,传入my_batch(TODO:tracer是干啥的)
-
sync
与disableWAL
设置冲突,返回NotSurpport -
two_write_queues_
以及enable_pipelined_write
设置冲突,返回NotSurpport -
seq_per_batch
与enable_pipelined_write
暂时不支持(TODO:seq_per_batch) - 如果WriteOptions设置了
low_pri
选项,则调用函数ThrottleLowPriWritesIfNeeded
- 如果
two_write_queue_
以及disableMemtable
同时设置,则进入WriteImplWALOnly
函数(也就是说禁用memtable,后面的过程就不同了。只需要写WAL) - 如果
enable_pipelined_write
同时设置,则进入PipelinedWriteImpl
writer以及Write_Group
传入WriteImpl函数获得的所有参数构造WriteThread::Writer结构w,然后调用write_thread_.JoinBatchGroup,这个函数会将当前这个Writer w加入到WriteThread的Writer链表中,当w通过JoinBatchGroup之后会自动被设置一个状态state,如果当前writer是第一个进入WriteThread的writer,则成为当前Group的leader,状态被设置为WriteThread::STATE_GROUP_LEADER
,否则说明已经有了一个leader,则等待leader为当前的writer设置状态(AwaitState)
PreProcessWrite
当程序执行到此处说明当前writer是Group_Leader,当two_write_queues_为false并且disable_memtable为false的时候,进入PreProcessWrite函数进行预处理,该过程需要mutex加锁,传入参数write_options、need_log_sync、write_contex。
根据官方文档,memtable被flush有三个条件,满足其中之一则触发memtable的flush操作:
- 单个memtable的size超过
writer_buffer_size
- 总memtable size超过
db_write_buffer_size
或者由write_bufer_namager发起了一次flush,此时会flush最大的那个memtable - WAL的总size超过
max_total_wal_size
,此时会将包含最老的数据的memtable给flush,这样让包含这部分数据的WAL可以释放
PreProcessWrite主要过程为一系列判断,并根据判断执行对应的操作。
-
!single_column_family_mode_ && total_log_size > GetMaxTotalSize()
:表明总的log size超过了额定的阈值,此时需要更换WAL,调用函数SwitchWAL
,传入write_context(满足flush条件3) -
write_buffer_manager_->ShouldFlush()
:WriteBufferManager判断当前memtable需要dump(memtable达到了设定的阈值大小),调用函数HandleWriteBufferFull
,传入write_context(满足条件2中总size超出阈值) -
!flush_scheduler.Empty()
:调用ScheduleFlushes
,传入write_context(满足条件2中的wbm触发) -
write_controller_.IsStopped() || write_controller_.NeedsDelay()
:与write_controler相关,调用DelayWrite函数,传入last_batch_group_size以及write_options -
need_log_sync
:等待log同步完成
UNLIKELY以及LIKELY
DB_Impl::PreProcessWrite中大量使用了UNLIKELY以及LIKELY两个宏,其定义在源文件port/likely.h
中。
实现上主要封装了函数__builtin_expect(long exp, long c)
,这是一个编译上的优化,expect函数告诉编译器表达式exp值为c的几率比较大,希望可以针对此做优化,返回值为exp的值。
宏LIKELY实现为buildtin_expect((x), 1)
,也就是说LIKELY中x表达式为真的概率比较大,对于一个if语句:if(LIKELY(x))
,其等价于if(x)
,只不过这里告诉编译器x的值为true的可能更大,则可以根据此来进行汇编上的优化。
UNLIKELY同理。
Insert过程
insert前的准备
首先调用write_thread_的EnterAsBatchGroupLeader,传入参数w以及write_group,这一步的作用主要是尽可能将能够一块写入memtable的writer都加入write group。
然后判断是否能并行插入memtable,parallel为ture的条件为设置中allow_concurrent_memtable_write选项为真并且write_group中writer个数大于1(默认情况下只有一个所以这个时候parallel为false)
记录各种本次write相关的状态:NUMBER_KEYS_WRITTEN
、BYTES_WRITETEN
、WRITE_DONE_BY_SELF
、WRITE_DONE_BY_OTHER
如果开启了disableWAL选项,则将has_unpersisted_data_这个flag设置为true。
然后将数据写入WAL中,此时有两种情况,一是当two_write_queue_为false的时候,直接调用WriteToWAL
函数写入WAL,传入参数write_group、log_writer、log_used、need_log_sync、need_log_dir_sync、last_sequence+1;否则调用ConcurrentWriteToWAL
。默认情况为前者。
insert
如果parallel为false,则执行普通的插入,调用函数WriteBatchInterl::InsertInto
将当前write_group中的数据写入memtable,传入参数:
write_group
current_sequence
column_family_memtables.get()
flush_scheduler
write_options.ignore_missing_column_families
recover_log_number
this
parallel
seq_per_batch
batch_per_txn_
否则执行并发的插入操作。
后续处理
如果need_log_sync为真,即WriteOptions中sync参数为true,则需要对log进行同步操作。(默认没有开启two_write_queues_的情况下只需要调用MarkLogsSynced,否则才是调用FlushWAL或者SyncWAL)
如果当前writer在并行的write_group中则需要进行并行writer相关的处理,默认情况为false。
最后调用所有的writer的callback函数并更新version的LastSequence,最后执行write_thread_.ExitAsBatchGroupLeader将writer的状态设置为STATE_COMPLETE并退出(ExitAsBatchGroupLeader在pipeiline write的时候操作比较复杂,普通情况下只是设置Writer状态为COMPLETE)。
其他写流程分支的实现
Concurrent Memtable Insert
在leader writer进入WriteThread的EnterAsWriteGroupLeader函数之后会将符合条件的别的writer加入到write_group中,此时如果能够进行并行的memtable插入,则会由leader发起一次parallel memtable insert,各个writer共同完成插入memtable的过程。
在完成writer的选择之后判断是否能够进行并行的插入,此时的条件为:
immutable_db_options_.allow_concurrent_memtable_write && write_group.size > 1
字面来看就是write_group中writer至少大于1个(这样才有并行的意义),并且memtable的实现要支持并行的插入(目前只有skiplist才支持),rocksdb开发者在代码注释中写了三条规则,只有满足这三条规则的情况下才能够执行并行插入:
- memtable支持
- 非inplace update
- 非merge(需要检查每个batch)(具体体现为遍历write_group中的每个writer,当batch的HasMerge标志位为true的时候设置parallel为false)
对于WAL,write_gorup中所有writer会由leader统一写入WAL
进入插入流程,此时如果parallel为true,则进入并行插入的流程(注意此时除了leader到达了这里之外,其他的writer还在JoinBatchGroup
的AwaiteState
阶段)。首先遍历所有的writer并设置其sequence number,完成之后调用WriteThread::LaunchParallelMemtableWriters()
,通过该函数唤醒等待的其他writer,设置状态成为STATE_PARALLEL_MEMTABLE_WRITER
,并开始继续执行。同时leader也将自己的数据写入memtable中。
Parallel Memtable Writer的写流程
这就回到了代码前段调用JoinBatchGroup之后,此时有个判断:
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group
...
}
此处就是并行memtable插入时被leader唤醒的其他writer要执行的操作。
写入过程比较简单,调用WriteBatchInternal::InsertInto
就完成写入
完成写入之后需要自行退出,首先调用WriteThread::CompleteParallelMemtableWriter
函数判断是否还有别的writer没有结束,如果不是最后一个完成的writer则等待别的writer完成写操作;否则就需要为所有的writer执行退出前的后续工作。这些后续工作主要就是挨个对write group中的所有writer调用callback函数,然后设置version的last_sequence,最后调用WriteThread::ExistAsBatchGroupFollower
将其他等待的writer状态设置成COMPLETED并退出。
WriteImplWALOnly
- 触发条件:
two_write_queues_
&disable_memtable
PipelineWriteImpl
- 触发条件:
enable_pipielined_write
= true
重要数据结构分析
WriteThread
相关源代码文件:db/write_thread.h, db/write_thread.cc
WriteThread主要负责管理封装了Put操作的Writer
数据成员
// See AwaitState.
const uint64_t max_yield_usec_;
const uint64_t slow_yield_usec_;
// 并发memtable插入操作是否允许
const bool allow_concurrent_memtable_write_;
// 针对memtable以及WAL的pipeline write是否允许
const bool enable_pipelined_write_;
// Points to the newest pending writer. Only leader can remove
// elements, adding can be done lock-free by anybody.
std::atomic<Writer*> newest_writer_;
// Points to the newest pending memtable writer. Used only when pipelined
// write is enabled.
std::atomic<Writer*> newest_memtable_writer_;
// The last sequence that have been consumed by a writer. The sequence
// is not necessary visible to reads because the writer can be ongoing.
SequenceNumber last_sequence_;
主要API
-
void JoinBatchGroup(Writer* w);
JoinBatchGroup实现的功能是将一个Writer插入到WriteThread的Writer链表中,WriteThread中通过一个atomic的指针newest_writer来指向最新的writer,并且这个writer连接了链表中其他的writer。
w进入该函数之后首先调用LinkOne函数,本质上LinkOne函数做的事就是把传入的w插入到链表中,实现这一操作的语句:
Writer* writers = newest_writer->load(std::memory_order_relaxed); while (true) { w->link_older = writers; if (newest_writer->compare_exchange_weak(writers, w)) { return (writers == nullptr); } }
对于这个compacre_exchange_weak,简单的理解是一个原子的替换操作,原子地将w替换到newest_writer里面,当w替换成功的时候返回true,进入return语句,这个时候如果writer为nullptr说明当前writer是插入的第一个writer,那么当前writer就成为leader,否则替换失败重复执行(这个时候应该是别的线程的writer插入了)直到插入成功
回到JoinBatchGroup,如果LinkOne返回值为true则设置当前writer状态为STATE_GROUP_LEADER,其余调 用AwaitState等待状态改变。
-
size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
该函数将leader writer加入到Write Group中,并且选择符合条件的其他writer加入到同一个group中
-
void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status);
这个函数主要执行退出的时候的状态设置,先不考虑pipeline write的情况,最简单的功能就是从last_writer开始遍历group中的所有writer,然后设置他们的状态为STATE_COMPLETE
-
void LaunchParallelMemTableWriters(WriteGroup* write_group);
将write group中所有的writer的状态设置成
STATE_PARALLEL_MEMTABLE_WRITER
以唤醒等待的writer执行并发的插入操作 -
bool CompleteParallelMemTableWriter(Writer* w)
在每个并发memtable插入的writer执行过程最后调用,判断如果当前writer不是最后一个writer(write group中还有正在执行的writer)则等待其他writer完成写操作(
AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
),否则返回true -
void ExitAsBatchGroupFollower(Writer* w)
对Group Leader调用
ExitAsBatchGroupLeader
以及将leader的状态设置为STATE_COMPLETED
Writer
在Put的时候对一个写操作的封装
数据成员
//writebatch以及write_options相关的数据
WriteBatch* batch;
bool sync;
bool no_slowdown;
bool disable_wal;
bool disable_memtable;
size_t batch_cnt; // if non-zero, number of sub-batches in the write batch
//如果不为0表示batch中还有其他的子batch
//从write函数中传入的数据
PreReleaseCallback* pre_release_callback;
uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference
WriteCallback* callback;
bool made_waitable; // records lazy construction of mutex and cv
std::atomic<uint8_t> state; // write under StateMutex() or pre-link
WriteGroup* write_group; //所属的write_group
SequenceNumber sequence; // the sequence number to use for the first key
Status status; // status of memtable inserter
Status callback_status; // status returned by callback->Callback()
std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
//write_group中的链表指针
Writer* link_older; // read/write only before linking, or as leader
Writer* link_newer; // lazy, read/write only before linking, or as leader
其中state初始为STATE_INIT,其余参数通过调用者传入
writer结构中包含了writer指针link_older以及link_newer,也就是说多个writer在 write group中是以链表的形式组织,并且每个write携带者其对应batch的数据
主要API
-
bool CallbackFailed()
:当callback不为空,并且callback_status为不为OK的时候返回true,表示回调函数调用出问题 -
bool ShouldWriteToMemtable()
:当status没有问题,Callback函数调用正常以及disable_memtable为false的时候返回true -
bool ShouldWriteToWAL()
:同上,并且当disableWAL为false的时候为true -
bool CheckCallback(DB* db)
:调用callback函数并将返回的状态存储到callback_status
WriteGroup
WriteGroup是一个将多个writer统一起来的结构,类似链表的顶层结构,其中包含两个Writer指针分别为leader以及last_writer,类似链表中的头指针和尾指针
数据成员
// Writer指针
Writer* leader = nullptr;
Writer* last_writer = nullptr;
SequenceNumber last_sequence;
// before running goes to zero, status needs leader->StateMutex()
// 状态相关的变量
Status status;
std::atomic<size_t> running;
size_t size = 0;
State
state是write_thread中定义的writer的不同状态,不同状态下的writer有着不同的操作方式
enum State : uint8_t {
// The initial state of a writer. This is a Writer that is
// waiting in JoinBatchGroup. This state can be left when another
// thread informs the waiter that it has become a group leader
// (-> STATE_GROUP_LEADER), when a leader that has chosen to be
// non-parallel informs a follower that its writes have been committed
// (-> STATE_COMPLETED), or when a leader that has chosen to perform
// updates in parallel and needs this Writer to apply its batch (->
// STATE_PARALLEL_FOLLOWER).
// writer的初始状态,等待JoinBatchGroup,后续可能变成其他状态
STATE_INIT = 1,
// The state used to inform a waiting Writer that it has become the
// leader, and it should now build a write batch group. Tricky:
// this state is not used if newest_writer_ is empty when a writer
// enqueues itself, because there is no need to wait (or even to
// create the mutex and condvar used to wait) in that case. This is
// a terminal state unless the leader chooses to make this a parallel
// batch, in which case the last parallel worker to finish will move
// the leader to STATE_COMPLETED.
// 通知一个writer现在变成了leader并且他需要创建一个write batch gorup
STATE_GROUP_LEADER = 2,
// The state used to inform a waiting writer that it has become the
// leader of memtable writer group. The leader will either write
// memtable for the whole group, or launch a parallel group write
// to memtable by calling LaunchParallelMemTableWrite.
// 通知一个writer变成了一个memtable writer group的leader
// leader要么将整个group写到memtable
// 要么调用LaunchParallelMemtableWrite发起一次并行写memtable
STATE_MEMTABLE_WRITER_LEADER = 4,
// The state used to inform a waiting writer that it has become a
// parallel memtable writer. It can be the group leader who launch the
// parallel writer group, or one of the followers. The writer should then
// apply its batch to the memtable concurrently and call
// CompleteParallelMemTableWriter.
// 告知一个writer变成了一个parallel memtable writer
// writer应该将其batch同步地应用到memtable并调用CompleteParallelMemtableWriter
STATE_PARALLEL_MEMTABLE_WRITER = 8,
// A follower whose writes have been applied, or a parallel leader
// whose followers have all finished their work. This is a terminal
// state.
// 已经成功写入的writer
STATE_COMPLETED = 16,
// A state indicating that the thread may be waiting using StateMutex()
// and StateCondVar()
// 告知thread需要等待StateMutex或者StateCondVar
STATE_LOCKED_WAITING = 32,
};
WriteBatch&WriteBatchInternal
WriteBatch主要是提供kv以及cf信息的封装,WriteBatchInternal提供针对WriteBatch的相关操作接口
WriteBatchInternal::InsertInto
根据传入的数据构造MemtableInserter,然后调用WriteBatch::Iterate,传入inserter实现写操作
如果是concurrent memtbale write还需要调用inserter的PostProcess(主要是与状态信息的处理有关)
WriteBatch::Iterate
【作用】遍历batch中的所有数据,并根据数据类型进行对应的操作
遍历所有的input
首先通过ReadRecordFromWriteBatch读取待插入的所有数据(input、tag、column_family、key、value、blob、xid),其中tag区分不同的写入数据类型
根据tag确定不同写入数据的不同操作,对于普通的写tag为kTypeValue
,此处调用MemtableInserter::PutCF
MemtableInserter
PutCF&&PutCFImpl
MemtableInserter的PutCF将传入的kv数据写入memtable,PutCF函数是调用PutCFImpl函数实现
其他基本操作在Inserter中也有,主要有DeleteCF、DeleteRangeCF、MergeCF、PutBlobIndexCF、SingleDeleteCF
【PutCFImpl】
首先SeekToColumnFamily,根据传入的column_family_id,由ColumnFamilyMemtables::Seek查找对应的cf数据,如果没找到会根据WriteOptions中的ignore_missing_column_family判断是否返回错误(此时ColumnFamilySet中的Current已经定位到查找的这个cf)
获取memtable,调用cf_mems->GetMemTable(已经由Seek定位到了对应的cf)
对于非inplace_update,调用memtable::Add将数据写入即可,由Add函数将memtable写入之后会调用Memtable::UpdateFlushState由memtable自己决定是不是要更新memtable的状态为FLUSH_REQUESTED(memtable的状态的这里变化)
【MaybeAdvanceSeq】
与sequence number有关
【CheckMemtableFull】
获取当前插入的cfd,如果cfd的memtable的状态变为FLUSH_REQUESTED,则将该memtable的状态变为FLUSH_SCHEDULED并将该cfd加入flush_scheduler
相关函数过程分析
PreProcessWrite中涉及的一些过程
write_buffer_manager->ShouldFlush
首先write_buffer_manager这个功能必须是启用的
返回true的条件二者满足其一即可:
- memtable占用内存超过memtable总size的限制
- 内存使用超过了总的buffer size并且其中memtable占用了超过一半的buffer size
SwitchWAL
更换memtable并将旧的memtable加入flush队列触发flush
判断条件之一GetMaxTotalWalSize定义于db/db_impl_write.cc,返回值为:
mutable_db_options.max_total_wal_size
如果没有设置(==0),则为4 * max_total_in_memory_state_
否则为设定值
即该函数作用为WAL size超过了设定的阈值需要释放掉一部分log占用的空间,释放log按照时间顺序从最旧的开始然后遍历所有的cfd,对包含了小于最旧的log number的cfd进行flush
DB中维护了一个存放目前正在使用中的log的vector:alive_log_files_,其中每一个元素记录了一个log的number、size以及是否被flush的标志位,进行SwitchWAL的流程时首先获取alive_log_files_中的第一个元素的log作为最老的一个log——oldest_alive_log。
先忽略2pc的情况
然后遍历所有的cfd,对于包含了log_number小于等于oldest_alive_log的cfd都加入flush队列中等待flush,具体实现为对该cfd调用SwitchMemtable并将其immutable_memtable列表imm标记为请求flush状态,最后调用SchedulePendingFlush安排flush操作
最后调用MaybeScheduleFlushOrCompaction触发Flush或者Compaction
HandleWriteBufferFull
更换memtable并将旧的memtable加入flush队列触发flush
遍历所有的cfd,对所有的包含非空memtable的cfd,选择其中CreationSeq最小的cfd(理解为创建时间最久的memtable?),对该cfd调用SwitchMemtable,如果成功则将该cfd的imm列表标记为请求flush并安排一次Flush(SchedulePendingFlush
)同时触发尝试flush或者compaction任务(MaybeScgheduleFlushOrCompaction
)
ScheduleFlush
对所有在FlushScheduler中的cfd调用SwitchMemtable并Unref(如果Unref之后引用数为0则delete掉该cfd)
涉及到的相关函数分析
SchedulePendingFlush
将传入的cfd加入到flush_queue里(flush_queue是一个deque)
SwitchMemtable
SwitchMemtable的主要是为一个ColumnFamily更换Memtable同时新建一个WAL的过程
【处理log】
进入函数首先不考虑two_write_queue以及pipeline write的情况下,首先判断是否能够循环使用log,如果是则从log_recycle_files队列中pop一个出来作为新的log使用
在需要创建新的log file的情况下调用VersionSet::NewFileNumber分配一个新的log number并创建的新的WritableFile,而对于recycle的情况,则是新建ReuseWritableFile。然后通过新的WritableFile创建新的log::Writer
获取当前的seq,并以这个seq调用cfd的ConstructNewMemtable创建新的memtable,同时通过context创建新的SuperVersion
将新的log number添加到alive_log_files
遍历所有的cfd,对于包含memtable还没使用以及imm列表中没有flush的imm为0的cfd,更新他们的log number以及memtable的seq(为什么别的cfd不更新log?)
将新的memtable替换到cfd内同时将旧的加入到imm列表
最后调用InstallSuperVersionAndScheduleWork
,该函数构建新的SuperVersion并替换,同时触发对当前cfd的flush以及compaction操作
WriteToWAL
获取参数:
WriteGroup& write_group
log:Writer* log_writer
uint64_t* log_used
bool need_log_sync
bool need_log_dir_sync
SequenceNumber sequence
主要流程:
外部调用的WriteToWAL函数主要做的是对写WAL这个过程的封装,比如预处理以及后续清理工作之类的,真正完成写WAL的函数在另一个WriteToWAL函数重载里面,另一个函数接收一个merged_batch结构,这个merged_batch就包含了write_group所有的batch的数据。
在外部的WriteToWAL,第一项任务就是生成merged_batch,这里通过函数MergeBatch实现,MergeBatch的操作逻辑十分简单,如果write_group中只有一个writer,则merged_batch就是这个leader writer,否则遍历所有的writer,将其batch追加到tmp_batch中,最后merged_batch即为tmp_batch。所以当这个函数返回的时候如果merged_batch == write_group的leader,则说明只有一个batch,只需要设置一个batch的log_number,否则就需要遍历设置所有writer的log_number。
接下来就是调用真正的WriteToWAL,传入merged_batch以及log_writer,实际写入数据的操作是通过log_writer的AddRecord接口实现。
写入完成之后,根据设置的need_log_sync以及need_log_dir_sync参数判断是否对本次write进行sync操作
最后记录状态即完成(退出前tmp_batch手动清理)
其他put选项设置以及实现原理
pipeline write
【默认情况】
单一的write thread队列,队首writer成为leader,并负责写WAL以及memtable
【pipeline write】
只有一个writer的情况下,要先写WAL,再写memtbale
如果有多个writer,默认情况就需要先写完WAL,在写memtable
启用pipeline之后,前一个writer写完WAL就可以写memtbale,而后一个writer开始写他的WAL
开启方式:Options.enable_pipeline_write=true
提升:20%性能提升