比特币源码分析:任务调度器的使用

任务调度器

Bitcoin 进程启动后,有一个专门的线程做任务调度, 这些任务根据指定的时刻,执行对应的函数:

bool AppInitMain()
{
   .......
   // Start the lightweight task scheduler thread
    CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
    threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
   .......
}

调度器类主要是实现了一个生产者消费者的任务队列,只是这个任务队列是用 std::multimap 实现的,map 的key表达某一时刻,map的值表达:那一时刻要执行的函数,内部使用条件变量和锁来保护multimap ,还有几个bool 条件:

class CScheduler
{
public:
    CScheduler();
    ~CScheduler();
    
    typedef std::function<void(void)> Function;
    
    void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());
    void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
    void scheduleEvery(Function f, int64_t deltaMilliSeconds);
    void serviceQueue();
    void stop(bool drain=false);
    size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
                        boost::chrono::system_clock::time_point &last) const;
    bool AreThreadsServicingQueue() const;

private:
    std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
    boost::condition_variable newTaskScheduled;
    mutable boost::mutex newTaskMutex;
    int nThreadsServicingQueue;
    bool stopRequested;
    bool stopWhenEmpty;
    bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
};

CScheduler的client 通过调用schedule 往内部multimap添加一个条目;
scheduleFromNow 和scheduleEvery 内部都是调用schedule 方法实现;
这三个方法属于生产者要生产任务的方法, 任务的消费者调用serviceQueue等待取走任务, 然后执行。
目前整个程序有一个全局的CScheduler实例:

  static CScheduler scheduler;

这个实例对应只有一个消费者线程, 即唯一的后台调度器线程。
class SingleThreadedSchedulerClient 主要用途是,借助CScheduler类型,保障被添加到内部链表的任务,被串行执行:

class SingleThreadedSchedulerClient {
private:
    CScheduler *m_pscheduler;

    CCriticalSection m_cs_callbacks_pending;
    std::list<std::function<void (void)>> m_callbacks_pending;
    bool m_are_callbacks_running = false;
    void MaybeScheduleProcessQueue();
    void ProcessQueue();
    
public:
    explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
    void AddToProcessQueue(std::function<void (void)> func);
    void EmptyQueue();
    size_t CallbacksPending();
};

使用例子

基本的使用例子:

#include <scheduler.h>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/test/unit_test.hpp>
#include <iostream>

static void doN(){
    std::cout << "output now\n";
}
static void doE(){
    for(int i = 0; i < 10; i++){
        std::cout << "i = " << i << '\n';
    }
    std::cout << '\n';
}

BOOST_AUTO_TEST_SUITE(sche_tests)
BOOST_AUTO_TEST_CASE(sche)
{
    CScheduler s;
    s.scheduleFromNow(doN, 1000); 
    s.scheduleEvery(doE, 1000); 
    boost::thread t(boost::bind(&CScheduler::serviceQueue, &s));
    boost::this_thread::sleep_for(boost::chrono::seconds{5});
    t.interrupt();
    t.join();
}

BOOST_AUTO_TEST_CASE(singlethread)
{
    CScheduler s;
    SingleThreadedSchedulerClient  sc (&s);
    for(int i = 1; i <11; i++){
        auto  f = [=]{
            std::cout << "thread " << boost::this_thread::get_id() << " print arg: " << i << '\n';
        };
            sc.AddToProcessQueue(f);
    }
    boost::thread t(boost::bind(&CScheduler::serviceQueue, &s));
    boost::this_thread::sleep_for(boost::chrono::seconds{1});
    t.interrupt();
    t.join();
}
BOOST_AUTO_TEST_SUITE_END()

进程启动后, 全局对象连接管理器connman初始化后, connman 的Start 方法最后,通过scheduler 线程安排了一个定时任务: 每隔15分钟, 把connman 对象内部成员,banmap_t 类型的 setBanned, CAddrMan 类型的addrman 序列化到本地文件banlist.datpeers.dat

//init.cpp
if (!connman.Start(scheduler, connOptions)) {
        return false;
}
//net.cpp
bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
{
    ...............
    scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
}

如果钱包功能编译使能, 会让scheduler 线程安排每隔500毫秒刷新钱包状态。

//init.cpp 
#ifdef ENABLE_WALLET
    StartWallets(scheduler);
#endif

//wallet/init.cpp 
void StartWallets(CScheduler& scheduler) {
    for (CWalletRef pwallet : vpwallets) {
        pwallet->postInitProcess(scheduler);
    }
}

//wallet/wallet.cpp 
void CWallet::postInitProcess(CScheduler& scheduler)
{
    ReacceptWalletTransactions();
    if (!CWallet::fFlushScheduled.exchange(true)) {
        scheduler.scheduleEvery(MaybeCompactWalletDB, 500);
    }
}

PeerLogicValidation 对象的构造函数内部, scheduler 线程安排每45秒执行CheckForStaleTipAndEvictPeer函数主要做两件事:

  1. 关掉多余的外出tcp 连接
  2. 根据当前时间,检查当前节点的blockchain 的tip 是否有可能过时了,建立额外的连接同步跟上
PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &scheduler) : connman(connmanIn), m_stale_tip_check_time(0) {
    // Initialize global variables that cannot be constructed at startup.
    recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));

    const Consensus::Params& consensusParams = Params().GetConsensus();
    // Stale tip checking and peer eviction are on two different timers, but we
    // don't want them to get out of sync due to drift in the scheduler, so we
    // combine them in one function and schedule at the quicker (peer-eviction)
    // timer.
    static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer");
    scheduler.scheduleEvery(std::bind(&PeerLogicValidation::CheckForStaleTipAndEvictPeers, this, consensusParams), EXTRA_PEER_CHECK_INTERVAL * 1000);
}

void PeerLogicValidation::CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams)
{
    if (connman == nullptr) return;

    int64_t time_in_seconds = GetTime();

    EvictExtraOutboundPeers(time_in_seconds);

    if (time_in_seconds > m_stale_tip_check_time) {
        LOCK(cs_main);
        // Check whether our tip is stale, and if so, allow using an extra
        // outbound peer
        if (TipMayBeStale(consensusParams)) {
            LogPrintf("Potential stale tip detected, will try using extra outbound peer (last tip update: %d seconds ago)\n", time_in_seconds - g_last_tip_update);
            connman->SetTryNewOutboundPeer(true);
        } else if (connman->GetTryNewOutboundPeer()) {
            connman->SetTryNewOutboundPeer(false);
        }
        m_stale_tip_check_time = time_in_seconds + STALE_CHECK_INTERVAL;
    }
}

以上就是bitoin 里面CScheduler类的主要使用场景。


本文由 Copernicus团队 喻建 编写,转载无需授权!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,290评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,107评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,872评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,415评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,453评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,784评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,927评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,691评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,137评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,472评论 2 326
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,622评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,289评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,887评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,741评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,316评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,490评论 2 348

推荐阅读更多精彩内容