本章阅读收获:可了解Quartz框架中的正式开始运行部分源码
继上节内容
上节内容我们讲到了QuartzSchedulerThread的前半部分,也就是while循环保证在schedule调用start方法情况下,才开始进行工作任务的调用。接下来我们继续往下进行分析。
QuartzSchedulerThread的run方法后续源码分析
第一部分源码是:
//获取可用线程数 qsRsrcs是QuartzSchedulerResources对象
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
//清除调度改变的信号
clearSignaledSchedulingChange();
try {
//到JobStore中获取下次被触发的触发器
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
...
这里首先回去获取可用线程数,这里返回的值是肯定大于0的,为什么呢?我们直接进入源码:
/**
* 获取可用线程个数
*/
public int blockForAvailableThreads() {
synchronized(nextRunnableLock) {
while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
return availWorkers.size();
}
}
我们可以看到,只要程序没有停止,如果没有可用线程,就会一直while循环卡顿,直到有可用线程。
接下来是clearSignaledSchedulingChange清除信号,源码是:
/**
* 清除调度改变的信号
*/
public void clearSignaledSchedulingChange() {
synchronized(sigLock) {
signaled = false;
signaledNextFireTime = 0;
}
}
这里也没有多大可以说的,就是把变量设置到初始化状态。
最后也是最关键的部分获取Trigger触发器。由于我们放在QuartzSchedulerResources的JobStore的是RAMJobStore,那我们直接跳入进去:
/**
* 通过调用调度器获取下一个触发触发器的句柄,并将其标记为'reserved'
*/
public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {
synchronized (lock) {
List<OperableTrigger> result = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>();
long batchEnd = noLaterThan;
// return empty list if store has no triggers.
if (timeTriggers.size() == 0)
return result;
while (true) {
TriggerWrapper tw;
try {
tw = timeTriggers.first();
if (tw == null)
break;
timeTriggers.remove(tw);
} catch (java.util.NoSuchElementException nsee) {
break;
}
if (tw.trigger.getNextFireTime() == null) {
continue;
}
if (applyMisfire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
continue;
}
if (tw.getTrigger().getNextFireTime().getTime() > batchEnd) {
timeTriggers.add(tw);
break;
}
// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
// put it back into the timeTriggers set and continue to search for next trigger.
JobKey jobKey = tw.trigger.getJobKey();
JobDetail job = jobsByKey.get(tw.trigger.getJobKey()).jobDetail;
if (job.isConcurrentExectionDisallowed()) {
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
excludedTriggers.add(tw);
continue; // go to next trigger in store.
} else {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}
tw.state = TriggerWrapper.STATE_ACQUIRED;
tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
OperableTrigger trig = (OperableTrigger) tw.trigger.clone();
if (result.isEmpty()) {
batchEnd = Math.max(tw.trigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
}
result.add(trig);
if (result.size() == maxCount)
break;
}
// If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store.
if (excludedTriggers.size() > 0)
timeTriggers.addAll(excludedTriggers);
return result;
}
}
下面就逐行进行分析,首先是三个变量,现在肯定是不知道干什么的,所以我们继续往下看,之后我们关注到RAMJobStore的属性timeTriggers如果里面没有内容就直接返回了。那么timeTriggers是什么呢?
protected TreeSet<TriggerWrapper> timeTriggers = new TreeSet<TriggerWrapper>(new TriggerWrapperComparator());
我们可以看到这里放入了一个TriggerWrapperComparator类,但是这个具体干嘛的?大家从名字可能看出来这是一个触发器的比较器,是timeTriggers放入进去排序的算法。
class TriggerWrapperComparator implements Comparator<TriggerWrapper>, java.io.Serializable {
private static final long serialVersionUID = 8809557142191514261L;
TriggerTimeComparator ttc = new TriggerTimeComparator();
public int compare(TriggerWrapper trig1, TriggerWrapper trig2) {
return ttc.compare(trig1.trigger, trig2.trigger);
}
@Override
public boolean equals(Object obj) {
return (obj instanceof TriggerWrapperComparator);
}
@Override
public int hashCode() {
return super.hashCode();
}
}
就是比较类,之中的TriggerTimeComparator就是根据Trigger下一次触发时间的比较类,这里也比较好理解,就是对于触发器根据最近触发时间来进行排序。那么我们继续进行下去,接下来是会取出距离现在最快触发的Trigger,这里就假设是tw变量(方便下面述说),然后从timeTriggers中移除出去,如果tw没有下次触发时间了,则直接跳过继续。下面是applyMisfire方法:
protected boolean applyMisfire(TriggerWrapper tw) {
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
misfireTime -= getMisfireThreshold();
}
Date tnft = tw.trigger.getNextFireTime();
if (tnft == null || tnft.getTime() > misfireTime
|| tw.trigger.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) {
return false;
}
Calendar cal = null;
if (tw.trigger.getCalendarName() != null) {
cal = retrieveCalendar(tw.trigger.getCalendarName());
}
signaler.notifyTriggerListenersMisfired((OperableTrigger)tw.trigger.clone());
tw.trigger.updateAfterMisfire(cal);
if (tw.trigger.getNextFireTime() == null) {
tw.state = TriggerWrapper.STATE_COMPLETE;
signaler.notifySchedulerListenersFinalized(tw.trigger);
synchronized (lock) {
timeTriggers.remove(tw);
}
} else if (tnft.equals(tw.trigger.getNextFireTime())) {
return false;
}
return true;
}
这段代码就是如果触发器失火的情况下的处理(失火是指触发器在改触发的时间点没有触发Job任务)。就是在失火状态下根据触发器的下次触发时间,分别通知触发器监听器不同信息。
之后便是在判断触发器的下次触发时间如果大于noLaterThan的时候,再把tw加到timeTriggers,这一段什么意思呢?
其实就是如果触发器的下次触发时间要大于系统现在的时间加上我们设置的参数,这说明了最近触发时间还有点远,让我们直接歇一下。那么其实大家想到没有,之前idleWaitTime的变量就是就是一个调优参数了,这个参数决定了Trigger的什么时候获取出来。
之后便通过trigger获取JobKey,然后在通过JobStore下的map根据JobKey获取唯一的JobDetail。那么之后的job.isConcurrentExectionDisallowed()是干啥的?
public boolean isConcurrentExectionDisallowed() {
return ClassUtils.isAnnotationPresent(jobClass, DisallowConcurrentExecution.class);
}
其实就是查看Job是否有DisallowConcurrentExecution这个注解,这个注解是干嘛的呢?就是防止比如我们Job运行时间是5秒,但是定时任务是每3秒调用一次,这就会导致Job并发去跑。但是如果有了这个注解,就会等之前的Job跑完之后才会去跑。怎么实现的?那么我们接下看,如果Job包含这个注解,那么就会调用acquiredJobKeysForNoConcurrentExec.contains(jobKey),会查询Set中是否有这个JobKey,如果有的话就会在去除调用的Set中添加进去,并且直接跳过当前流程。后面的操作就相对比较简单了,判断下Trigger的总量,然后最后把本次跳过的Trigger在添加进去。
结束语
本节内容我们完成了run方法的第一小步,我已感觉收获良多,相信在接下去的阅读过程中对我帮助更大。