最近在研究定时任务,典型的定时任务有ScheduledExecutorService,spring的quartz,下面简单说说ScheduledExecutorService,它用了一个队列,建立新任务时会把任务放入队列中,并且会根据下一次要执行的任务的时间对这个队列的顺序再进行调整,因此队列最前端的总是最近一次的需要执行的任务,在执行完一次任务后为了防止空转,它会调用一个
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
方法,并最终调用
LockSupport.parkNanos(Object blocker, long nanos);
方法。
我以前没有仔细的深入这一块,总是觉得只要是jdk提供的方法那就是理所当然的,而现在当我想到,如果是我自己来实现一个定时的话,应该怎么实现,可能会遇到什么问题时,我才发现,并不是那么的简单。核心的一点就是,这个方法的开销如何才能尽量的小,而最后被唤醒的时间又如何精确呢?最后还有个更重要的问题就是,怎么应对JVM的GC带来的时间差,会不会出现线程安全问题?
于是我继续研究他们的内部实现
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}
parkNanos调用的是UNSAFE.park(false,nanos);方法
进一步可以看到这个方法的定义是
public native void park(boolean isAbsolute, long time);
其中isAbsolute标志位的含义是,如果为true则time是毫秒数,如果为false则time为微秒数,简单来说就是高精度的意思。
以前看到native关键字我基本就浅尝辄止了,不过这次因为带着疑问,我很想知道最终实现是不是一个while循环,然后不断比对当前时间和指定的截止时间来确定什么时候返回。因为我始终感觉这样实现会有很严重的开销,不知道他的底层有没有更好的实现方式。
下载jdk的含c++代码的源码 openjdk-8-src-b132-03_mar_2014版本,利用sumlime的全局查找,找到其调用的void Parker::park(bool isAbsolute, jlong time)方法。代码不是很长
void Parker::park(bool isAbsolute, jlong time) {
// Ideally we'd do something useful while spinning, such
// as calling unpackTime().
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's an interrupt pending.
// Check interrupt before trying to wait
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {
unpackTime(&absTime, isAbsolute, time);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unblocking. Also. check interrupt before trying wait
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
#ifdef ASSERT
// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; // **** 1 ****
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
_cur_index = -1;
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
#ifdef ASSERT
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
其中 unpacktime方法是根据absolute设置时间精度的,根据对整个park方法的观察,实际执行停顿的方法,应该是 status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;这个方法的实现如下
int os::Bsd::safe_cond_timedwait(pthread_cond_t *_cond, pthread_mutex_t *_mutex, const struct timespec *_abstime)
{
return pthread_cond_timedwait(_cond, _mutex, _abstime);
}
可以看到它直接调用了pthread_cond_timedwait方法,通过搜索得知这个方法是操作系统库文件的一个方法,不过这个pthread_cond_timedwait而且也基本只找到了头文件信息,没有找到源文件实现。
与 Parker::park(bool isAbsolute, jlong time)在同一个文件下很相似的方法还有int os::PlatformEvent::park(jlong millis)方法,由于我是通过文本查看的源码,从感觉上来看很像Thread.sleep方法底层调用的方法之一,Thread.sleep的native方法的实现中有这样一段
int os::sleep(Thread* thread, jlong millis, bool interruptible) {
...
ParkEvent * const slp = thread->_SleepEvent ;
...
for (;;) {
// It'd be nice to avoid the back-to-back javaTimeNanos() calls on
// the 1st iteration ...
jlong newtime = javaTimeNanos();
if (newtime - prevtime < 0) {
// time moving backwards, should only happen if no monotonic clock
// not a guarantee() because JVM should not abort on kernel/glibc bugs
assert(!Bsd::supports_monotonic_clock(), "time moving backwards");
} else {
millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
}
if(millis <= 0) break ;
prevtime = newtime;
slp->park(millis);
}
...
}
可以看到sleep方法实际上最终调用了一个ParkEvent类的 park(long millis)方法,全局搜索了一下没有找到ParkEvent的实现类,不过在PlatformEvent里面倒是找到了一个类似的park方法,int os::PlatformEvent::park(jlong millis),它的实现中有这样一段
while (_Event < 0) {
status = os::Linux::safe_cond_timedwait(_cond, _mutex, &abst);
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (_cond);
pthread_cond_init (_cond, os::Linux::condAttr()) ;
}
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
if (!FilterSpuriousWakeups) break ; // previous semantics
if (status == ETIME || status == ETIMEDOUT) break ;
// We consume and ignore EINTR and spurious wakeups.
}
可以看到这里跟我最初设想的有点类似 一个while循环不断检测一个标志位,然后调用safe_cond_timedwait方法,不过这里面基本上都是信号量或者标志位的设置与获取,并没有我之前所设想的与系统时间不断进行比对然后重设标志位的逻辑,我估计这层逻辑应该在os::Linux::safe_cond_timedwait或者pthread_cond_destroy 方法里面,不过由于找不到源码,所以没法进一步探究了。
追到这里基本上本次源理探究就到一段落了,因为我也无法得到pthread_cond_timedwait的源码。
本次探究最最根本的源码实现没有找到,不过基本理清了tryLock和parkNanos的底层实现思路,在以后自己写程序的时候,我也会借助他们的方式,即
- 对于时间计算方面的问题,尽量用贴近底层的系统提供的api,或者基于这些api提供的jdk的实现,而不要随便开一个while循环,不断获取时间然后比对,这样子往往效率很低,而且伴随线程安全问题。
- 优化自己的代码结构,不要把大段的逻辑放在一个方法里,层层分离,每个方法只做自己应该关注的事情。