





std::vector<TimerObj> timer_heap_;  //timer的实现

245 bool UThreadEpollScheduler::Run() {
246     //more code...
250     int next_timeout = timer_.GetNextTimeout();
252     for (; (run_forever_) || (!runtime_.IsAllDone());) {
253         int nfds = epoll_wait(epoll_fd_, events, max_task_, 4);//强制wait 4ms
254         //more code...
285             DealwithTimeout(next_timeout); //处理超时任务
295     } //more code
296     return true;
297 }

315 void UThreadEpollScheduler::DealwithTimeout(int &next_timeout) {
316     while (true) {
317         next_timeout = timer_.GetNextTimeout();
318         if (0 != next_timeout) {
319             break;
320         }
322         UThreadSocket_t * socket = timer_.PopTimeout();
323         socket->waited_events = UThreadEpollREvent_Timeout;
324         runtime_.Resume(socket->uthread_id);
325     }
326 }

533 void UThreadWait(UThreadSocket_t &socket, const int timeout_ms) {
534     socket.uthread_id = socket.scheduler->GetCurrUThread();
535     socket.scheduler->AddTimer(&socket, timeout_ms);//添加定时器
536     socket.scheduler->YieldTask(); //切出去
537     socket.scheduler->RemoveTimer(socket.timer_id);//切回来后删除定时器
538 }








 46 struct timer {
 47     struct link_list near[TIME_NEAR];  //即将处理的节点
 48     struct link_list t[4][TIME_LEVEL];  //由时间远近分层
 49     struct spinlock lock;
 50     uint32_t time;  //累计多少个十毫秒
 51     uint32_t starttime;
 52     uint64_t current;
 53     uint64_t current_point;
 54 };

 55 static void
 56 wakeup(struct monitor *m, int busy) {
 57     if (m->sleep >= m->count - busy) {
 58         // signal sleep worker, "spurious wakeup" is harmless
 59         pthread_cond_signal(&m->cond);
 60     }
 61 }

 63 static void *
 64 thread_socket(void *p) {
 65     struct monitor * m = p;
 66     skynet_initthread(THREAD_SOCKET);
 67     for (;;) {
 68         int r = skynet_socket_poll();
 69         //more code ...
 75         wakeup(m,0); //有消息过来,唤醒一个工程线程处理定时任务
 76     }
 77     return NULL;
 78 }

128 static void *
129 thread_timer(void *p) {
130     struct monitor * m = p;
131     skynet_initthread(THREAD_TIMER);
132     for (;;) {
133         skynet_updatetime();
134         CHECK_ABORT
135         wakeup(m,m->count-1);  //唤醒一个工作线程
136         usleep(2500); //强制sleep2.5ms
137         //more code ...
141     }
142     //more code ...
149     return NULL;
150 }

152 static void *
153 thread_worker(void *p) {
154     //more code ...
161     while (!m->quit) {
162         q = skynet_context_message_dispatch(sm, q, weight);
163         if (q == NULL) {
164             if (pthread_mutex_lock(&m->mutex) == 0) { 
165                 ++ m->sleep;
166                 // "spurious wakeup" is harmless,
167                 // because skynet_context_message_dispatch() can be call at any time.
168                 if (!m->quit)
169                     pthread_cond_wait(&m->cond, &m->mutex);//没消息则挂起
170                 -- m->sleep;
171                 if (pthread_mutex_unlock(&m->mutex)) {
173                     exit(1);
174                 }
175             }
176         }
177     }
178     return NULL;
179 }

 95 static void
 96 timer_add(struct timer *T,void *arg,size_t sz,int time) {
 97     struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
 98     memcpy(node+1,arg,sz);
100     SPIN_LOCK(T);
102         node->expire=time+T->time;
103         add_node(T,node);
105     SPIN_UNLOCK(T);
106 }


brpc timer的实现


 29 class SimuFutex {
 30 public:
 31     SimuFutex() : counts(0)
 32                 , ref(0) {
 33         pthread_mutex_init(&lock, NULL);
 34         pthread_cond_init(&cond, NULL);
 35     }
 36     ~SimuFutex() {
 37         pthread_mutex_destroy(&lock);
 38         pthread_cond_destroy(&cond);
 39     }
 41 public:     
 42     pthread_mutex_t lock;
 43     pthread_cond_t cond;
 44     int32_t counts;  //有多少个线程等待
 45     int32_t ref; //引用计数
 46 };

 48 static pthread_mutex_t s_futex_map_mutex = PTHREAD_MUTEX_INITIALIZER;
 49 static pthread_once_t init_futex_map_once = PTHREAD_ONCE_INIT;
 50 static std::unordered_map<void*, SimuFutex>* s_futex_map = NULL;

 60 int futex_wait_private(void* addr1, int expected, const timespec* timeout) { //等待
 61     if (pthread_once(&init_futex_map_once, InitFutexMap) != 0) {
 63         exit(1);
 64     }
 65     std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex);
 66     SimuFutex& simu_futex = (*s_futex_map)[addr1];
 67     ++simu_futex.ref;
 68     mu.unlock();
 70     int rc = 0;
 71     {
 72         std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);
 73         if (static_cast<butil::atomic<int>*>(addr1)->load() == expected) {
 74             ++simu_futex.counts;
 75             if (timeout) {
 76                 timespec timeout_abs = butil::timespec_from_now(*timeout);
 77                 if ((rc = pthread_cond_timedwait(&simu_futex.cond, &simu_futex.lock, &timeout_ab    s)) != 0) {
 78                     errno = rc;
 79                     rc = -1;
 80                 }
 81             } else {
 82                 if ((rc = pthread_cond_wait(&simu_futex.cond, &simu_futex.lock)) != 0) {
 83                     errno = rc;
 84                     rc = -1;
 85                 }
 86             }
 87             --simu_futex.counts;
 88         } else {
 89             errno = EAGAIN;
 90             rc = -1;
 91         }
 92     }
 94     std::unique_lock<pthread_mutex_t> mu1(s_futex_map_mutex);
 95     if (--simu_futex.ref == 0) {
 96         s_futex_map->erase(addr1);
 97     }
 98     mu1.unlock();
 99     return rc;
100 }

102 int futex_wake_private(void* addr1, int nwake) { //唤醒
103     if (pthread_once(&init_futex_map_once, InitFutexMap) != 0) {
105         exit(1);
106     }
107     std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex);
108     auto it = s_futex_map->find(addr1);
109     if (it == s_futex_map->end()) {
110         mu.unlock();
111         return 0;
112     }
113     SimuFutex& simu_futex = it->second;
114     ++simu_futex.ref;
115     mu.unlock();
117     int nwakedup = 0;
118     int rc = 0;
119     {
120         std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);
121         nwake = (nwake < simu_futex.counts)? nwake: simu_futex.counts;
122         for (int i = 0; i < nwake; ++i) {
123             if ((rc = pthread_cond_signal(&simu_futex.cond)) != 0) {
124                 errno = rc;
125                 break;
126             } else {
127                 ++nwakedup;
128             }
129         }
130     }
132     std::unique_lock<pthread_mutex_t> mu2(s_futex_map_mutex);
133     if (--simu_futex.ref == 0) {
134         s_futex_map->erase(addr1);
135     }
136     mu2.unlock();
137     return nwakedup;
138 }

引用连接中的一段话“For reference, on my 4.0 SELinux test server with support for syscall auditing enabled, the minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is 2.7 usec, and the average is more like 10 usec. That can be a big drag on RockDB’s single-writer design.”,不过貌似mutex的实现也差不多,先看能不能获取锁,能的话直接返回,如果下面的结构:

struct mutex {
 1: 锁可以利用。 
 atomic_t  count;
 spinlock_t  wait_lock;
 struct list_head wait_list;

linux 2.6 互斥锁的实现-源码分析


 41 struct BAIDU_CACHELINE_ALIGNMENT TimerThread::Task {
 42     Task* next;                 // For linking tasks in a Bucket.
 43     int64_t run_time;           // run the task at this realtime
 44     void (*fn)(void*);          // the fn(arg) to run
 45     void* arg;
 46     // Current TaskId, checked against version in TimerThread::run to test
 47     // if this task is unscheduled.
 48     TaskId task_id;
 49     // initial_version:     not run yet
 50     // initial_version + 1: running
 51     // initial_version + 2: removed (also the version of next Task reused
 52     //                      this struct)
 53     butil::atomic<uint32_t> version;
 55     Task() : version(2/*skip 0*/) {}
 57     // Run this task and delete this struct.
 58     // Returns true if fn(arg) did run.
 59     bool run_and_delete();
 61     // Delete this struct if this task was unscheduled.
 62     // Returns true on deletion.
 63     bool try_delete();
 64 };

 66 // Timer tasks are sharded into different Buckets to reduce contentions.
 67 class BAIDU_CACHELINE_ALIGNMENT TimerThread::Bucket {
 68 public:
 69     Bucket()
 70         : _nearest_run_time(std::numeric_limits<int64_t>::max())
 71         , _task_head(NULL) {
 72     }
 74     ~Bucket() {}
 76     struct ScheduleResult {
 77         TimerThread::TaskId task_id;
 78         bool earlier;
 79     };
 81     // Schedule a task into this bucket.
 82     // Returns the TaskId and if it has the nearest run time.
 83     ScheduleResult schedule(void (*fn)(void*), void* arg,
 84                             const timespec& abstime);
 86     // Pull all scheduled tasks.
 87     // This function is called in timer thread.
 88     Task* consume_tasks();
 90 private:
 91     internal::FastPthreadMutex _mutex;
 92     int64_t _nearest_run_time;
 93     Task* _task_head;
 94 };
180 TimerThread::Bucket::ScheduleResult
181 TimerThread::Bucket::schedule(void (*fn)(void*), void* arg,
182                               const timespec& abstime) {
183     butil::ResourceId<Task> slot_id;
184     Task* task = butil::get_resource<Task>(&slot_id);
185     if (task == NULL) {
186         ScheduleResult result = { INVALID_TASK_ID, false };
187         return result;
188     }
189     task->next = NULL;
190     task->fn = fn;
191     task->arg = arg;
192     task->run_time = butil::timespec_to_microseconds(abstime);
193     uint32_t version = task->version.load(butil::memory_order_relaxed);
194     if (version == 0) {  // skip 0.
195         task->version.fetch_add(2, butil::memory_order_relaxed);
196         version = 2;
197     }
198     const TaskId id = make_task_id(slot_id, version);
199     task->task_id = id;
200     bool earlier = false;
201     {
202         BAIDU_SCOPED_LOCK(_mutex);
203         task->next = _task_head;
204         _task_head = task;
205         if (task->run_time < _nearest_run_time) {
206             _nearest_run_time = task->run_time;
207             earlier = true;
208         }
209     }
210     ScheduleResult result = { id, earlier };
211     return result;
212 }


214 TimerThread::TaskId TimerThread::schedule(
215     void (*fn)(void*), void* arg, const timespec& abstime) {
216     if (_stop.load(butil::memory_order_relaxed) || !_started) {
217         // Not add tasks when TimerThread is about to stop.
218         return INVALID_TASK_ID;
219     }
220     // Hashing by pthread id is better for cache locality.
221     const Bucket::ScheduleResult result =
222         _buckets[butil::fmix64(pthread_numeric_id()) % _options.num_buckets]
223         .schedule(fn, arg, abstime);//hash到某个bucket中
224     if (result.earlier) { //有更早的timer过来
225         bool earlier = false;
226         const int64_t run_time = butil::timespec_to_microseconds(abstime);
227         {
228             BAIDU_SCOPED_LOCK(_mutex);
229             if (run_time < _nearest_run_time) {//和全局的比较
230                 _nearest_run_time = run_time;
231                 ++_nsignals;
232                 earlier = true; //需要唤醒
233             }
234         }
235         if (earlier) {
236             futex_wake_private(&_nsignals, 1);
237         }
238     }
239     return result.task_id;
240 }



310 void TimerThread::run() {
319     // min heap of tasks (ordered by run_time)
320     std::vector<Task*> tasks;
321     tasks.reserve(4096);

339     while (!_stop.load(butil::memory_order_relaxed)) {
343         {
344             BAIDU_SCOPED_LOCK(_mutex);
345             _nearest_run_time = std::numeric_limits<int64_t>::max();//在这条语句之前,_nearest_run_time是所有最早的时间点
346         }
348         // 遍历_buckets收集没有被unscheduled的定时任务
349         for (size_t i = 0; i < _options.num_buckets; ++i) {
350             Bucket& bucket = _buckets[i];
351             for (Task* p = bucket.consume_tasks(); p != NULL;
352                  p = p->next, ++nscheduled) {
353                 if (!p->try_delete()) { // remove the task if it's unscheduled
354                     tasks.push_back(p);
355                     std::push_heap(tasks.begin(), tasks.end(), task_greater);
356                 }
357             }
358         }//维护个堆结构

360         bool pull_again = false;
361         while (!tasks.empty()) {
362             Task* task1 = tasks[0];  // the about-to-run task
363             if (task1->try_delete()) { // already unscheduled
364                 std::pop_heap(tasks.begin(), tasks.end(), task_greater);
365                 tasks.pop_back();
366                 continue;
367             }
368             if (butil::gettimeofday_us() < task1->run_time) {  // not ready yet.
369                 break;
370             }
381             {
382                 BAIDU_SCOPED_LOCK(_mutex);
383                 if (task1->run_time > _nearest_run_time) {
384                     // a task is earlier than task1. We need to check buckets.
385                     pull_again = true; //在执行任务的过程中又有更早的任务到来,则会重新遍历一次
386                     break;
387                 }
388             }
389             std::pop_heap(tasks.begin(), tasks.end(), task_greater);
390             tasks.pop_back();
391             if (task1->run_and_delete()) { //执行定时任务
393             }
394         }
395         if (pull_again) {
397             continue;
398         }
400         // 算出需要睡眠的时间
401         int64_t next_run_time = std::numeric_limits<int64_t>::max();
402         if (tasks.empty()) {
403             next_run_time = std::numeric_limits<int64_t>::max();
404         } else {
405             next_run_time = tasks[0]->run_time;
406         }
411         int expected_nsignals = 0;
412         {
413             BAIDU_SCOPED_LOCK(_mutex);
414             if (next_run_time > _nearest_run_time) {//全局时间有变化,需要重新处理
415                 // a task is earlier that what we would wait for.
416                 // We need to check buckets.
417                 continue;
418             } else {
419                 _nearest_run_time = next_run_time;
420                 expected_nsignals = _nsignals;
421             }
422         }
423         timespec* ptimeout = NULL;
424         timespec next_timeout = { 0, 0 };
425         const int64_t now = butil::gettimeofday_us();
426         if (next_run_time != std::numeric_limits<int64_t>::max()) {
427             next_timeout = butil::microseconds_to_timespec(next_run_time - now);
428             ptimeout = &next_timeout;
429         }
431         futex_wait_private(&_nsignals, expected_nsignals, ptimeout);
433     }
435 }



rocksdb源码分析 写优化之JoinBatchGroup
Linux Futex浅析

