RocksDB——线程管理

RocksDB——线程管理

调用入口:

env::Schedule() -> ThreadPoolImpl::Schedule()

env初始化的时候会构造两个线程池(ThreadPoolImpl),并分别赋予LOW和HIGH的优先度

ThreadPool Interface

源文件:include/rocksdb/threadpool.h

Interface Defination Function
void JoinAllThreads() 等待所有线程完成并丢弃没有开始执行的线程
void SetBackgroundThreads(int num) 设置线程池的线程数
int GetBackgroundThreads() 获取线程池的线程数
unsigned int GetQueueLen() 获取线程池中队列中被schedule的任务数
void WaitForJobsAndJoinAllThreads() 等待所有的任务完成
void SubmitJob(const std::function<void()>&)
void SubmitJob(std::function<void()>&&)
ThreadPool* NewThreadPool(int num_threads) 共外部调用创建一个线程池

ThreadPoolImpl

源文件:util/threadpool_imp.h, util/threadpool_imp.cc

ThreadPoolImpl继承自定义接口的ThreadPool基本类,同时增加了一些新的接口,但是ThreadPoolImpl并不直接实现各个函数的功能,而是由其包含的Impl结构体再去具体定义每个功能的实现。

Additional Interface

Interaface Defination Function
void LowerIOPriority() 使线程运行在一个低内核IO优先级下
void LowerCPUPriority() 使线程运行在一个低内核CPU优先级下
void IncBackgroundThreadsIfNeeded(int num) 保证线程池中至少有num个线程
void Schedule(void (*function)(void* arg1), void* arg, void* tag, void (*unschedFunction)(void* arg)) 安排一个任务,unscheduleFunction作用是取消一个已经安排但是还未执行的任务
int UnSchedule(void* tag) 将指定任务从队列中移除
void SetHostEnv(Env* env)
Env* GetHostEnv()
Env::Priority GetThreadPriority() 返回线程的优先度
void SetThreadPriority(Env::Priority priority) 设置线程的优先度

Impl的实现

源文件:util/threadpool_imp.cc

ThreadPoolImpl的真正实现是封装到Impl结构中的,所以主要接口也和ThreadPoolImpl基本一致,数据成员增加了不少状态相关的变量,除了几个表示线程状态的bool类型,其余包含存放被schedule的任务的队列queue_,实现上使用的是std::deque;信号量bgsignal_,使用了std::condition_variable;所有的线程存在一个名为bgthreads_的vector中。线程由port封装

任务定义为BGItem(与LevelDB一致),定义为:

 struct BGItem {
    void* tag = nullptr;
    std::function<void()> function;
    std::function<void()> unschedFunction;
  };

每个BGItem代表一个schedule的任务,BGItem保存在queue_中

初始化

Impl初始化的priority为LOW

如何schedule一个任务

首先由外部调用ThreadPoolImpl的Schedule接口,传入任务执行函数,对应参数,unschedFunction以及tag,ThreadPoolImpl::Schedule对这些参数进行封装然后调用Impl的Submit函数提交任务。当unschedFunction为空的时候,Schedule会传入一个空的std::Function

Impl接收到被提交的任务之后使用传入的参数构造BGItem,并添加到queue_中。

每次进入Submit函数都会首先调用一次StartBGThreads,StartBGThread中会循环向bgthreads_中添加thread直到达到total_threads_limit_(所以应该是首次调用才有效,后面调用也许会直接退出)

  while ((int)bgthreads_.size() < total_threads_limit_) {
    port::Thread p_t(&BGThreadWrapper,
    new BGThreadMetadata(this, bgthreads_.size()));
    bgthreads_.push_back(std::move(p_t));
  }

添加完BGItem之后根据当前bgthreads_中的线程数,如果没有超过线程数限制(total_threads_limit_)则只唤醒一个线程,否则需要唤醒所有线程(这样多余的线程才能被detach掉)

任务是怎样开始执行的

在StartBGThreads函数构造BGItem的过程中,会传入BGThreadWrapper函数

BGThreadWrapper函数在完成一系列关于thread属性的设置之后调用BGThread函数,传入参数thread_id

BGThread函数应该是整个线程的主体,函数包含一个while-true循环不停地执行从queue中取出任务并执行的过程

BGThread主体:

如果没有exit_all_threads_信号,不是最后一个超过限制的线程,以及满足任务队列为空或者当前线程超过了线程数量限制的线程的时候,等待

while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
           (queue_.empty() || IsExcessiveThread(thread_id))) {
    bgsignal_.wait(lock);
}

如果收到了exit_all_thread信号,在不需要等待全部任务完成的时候直接退出或者当任务队列清空之后再退出

if (exit_all_threads_) {  // mechanism to let BG threads exit safely
   if(!wait_for_jobs_to_complete_ || queue_.empty()) {
      break;
   }
}

如果当前线程是最新生成的一个超出数量限制的线程,则将该线程从bgthreads_中pop出来并detach,如果此时仍旧有超出限制的线程数,则唤醒所有的线程重复该过程(超出部分的线程就会被detach掉,以此来维护整个线程池中线程数量的稳定)

if (IsLastExcessiveThread(thread_id)) {
      // Current thread is the last generated one and is excessive.
      // We always terminate excessive thread in the reverse order of
      // generation time.
      auto& terminating_thread = bgthreads_.back();
      terminating_thread.detach();
      bgthreads_.pop_back();

      if (HasExcessiveThread()) {
        // There is still at least more excessive thread to terminate.
        WakeUpAllThreads();
      }
      break;
}

获取BGItem中的function,并且调用这个function

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 229,362评论 6 537
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,013评论 3 423
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 177,346评论 0 382
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,421评论 1 316
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,146评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,534评论 1 325
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,585评论 3 444
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,767评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,318评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,074评论 3 356
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,258评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,828评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,486评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,916评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,156评论 1 290
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,993评论 3 395
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,234评论 2 375