前提
最近要实现一个每隔几分钟就监控rpc调用是否存活的系统,考虑到监控的rpc数量众多,因此将每个监控作为任务,方便起见使用了quartz。
quartz参数配置
下面列出了部分配置参数以及自己的理解,完整的参数配置建议参考:
http://www.quartz-scheduler.org/documentation/quartz-2.2.x/configuration/
//实例名称,分布式下每个实例名称必须相同
org.quartz.scheduler.instanceName= jsfMonitorQuartzScheduler
org.quartz.scheduler.rmi.export= false
org.quartz.scheduler.rmi.proxy= false
org.quartz.scheduler.wrapJobExecutionInUserTransaction= false
//一次性取出的任务数,默认值是1,适合负载均衡,但不适合大量的短时任务
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=50
//一个quartz实例的主线程完成一次操作过后的停顿时间,默认30秒,
//对于大量短时任务可以适当的减小该值,弊端是增加数据库的负担
org.quartz.scheduler.idleWaitTime=30000
org.quartz.threadPool.class= org.quartz.simpl.SimpleThreadPool
//quartz执行任务的最大线程数,对于SimpleThreadPool,没有任务队列,
//新的任务到来时将阻塞到有空闲线程可用,由于这个限制,
//实际上quartz在从数据库拉取可执行任务时将根据线程数
//和batchTriggerAcquisitionMaxCount的较小值来获取可触发的任务
org.quartz.threadPool.threadCount= 50
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread= true
//当任务错过触发时间时,最忍受的最长延迟时间
org.quartz.jobStore.misfireThreshold= 60000
//使用jdbc
org.quartz.jobStore.class= org.quartz.impl.jdbcjobstore.JobStoreTX
//这里使用了自定义的driverDelegateClass,原因是公司数据库不支持blob,
//因此重写了底层关于处理blob的方法,使用varchar存储,使用json进行处理
org.quartz.jobStore.driverDelegateClass= com.jd.id.jsfmonitor.service.quartz.NoBlobJDBCDelegate
//该值表示是否datamap中所有数据都使用properties模式,即字符串,默认是false,
//这里使用了true也是为了不使用blob做的兼容,property模式下可以使用json进行统一处理
org.quartz.jobStore.useProperties= true
//关于数据源,这里可以和spring mvc使用同一个
#org.quartz.jobStore.dataSource= jsfmonitor
org.quartz.jobStore.tablePrefix= WARE_JSF_MONITOR_QRTZ_
//启用分布式
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
//默认值为0,表示一个任务可以提前多久被触发,适用于大量的任务需要触发
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
#org.quartz.dataSource.jsfmonitor.driver= com.mysql.jdbc.Driver
#org.quartz.dataSource.jsfmonitor.URL= jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8
#org.quartz.dataSource.jsfmonitor.user= root
#org.quartz.dataSource.jsfmonitor.password= 123456
#org.quartz.dataSource.jsfmonitor.maxConnections= 30
org.quartz.scheduler.instanceId= AUTO
关于不支持blob
公司的数据库不支持blob格式,因此手动实现了driverDelegateClass
,主要是通过继承StdJDBCDelegate
并重写其中和blob相关的方法,主要的修改点在如下部分;
Map<?, ?> map = null;
if (canUseProperties()) {
map = getMapFromProperties(rs);
} else {
//map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
throw new SQLException("只支持properties模式");
}
这段代码是要从结果集当中取出任务的datamap,包括获取trigger,jobdetail等多个方法中都有该代码段,这些方法都被重写了。源代码中会判断org.quartz.jobStore.useProperties
是否为true来选择加载的方式,这里我把该值设置为true,目的是为了有统一的入口可以修改从结果集当中获取数据的方式。源代码中getMapFromProperties
方法仍然会按照blob的方式进行解析,并且该方法是private的,因此我自己实现了该方法,如下:
private Map<?, ?> getMapFromProperties(ResultSet rs)
throws ClassNotFoundException, IOException, SQLException {
Map<?, ?> map;
String data = rs.getString(COL_JOB_DATAMAP);
if (data == null) {
return null;
}
Properties properties = JSON.parseObject(data, Properties.class);
map = convertFromProperty(properties);
return map;
}
通过string的方式获取,这里对应的是存数据的时候也需要string的方式存储,建表的时候也需要将blob修改为varchar。数据的解析则通过json实现(json可能不是最节省空间的方式)。
关于quartz主线程
quartz主线程类是QuartzSchedulerThread
,该类在其run方法内会每隔一段时间就从数据库中取出可以被执行的任务,执行过后会有短暂的sleep,sleep过后重复上述过程。这里单次获取的任务数上限取org.quartz.scheduler.batchTriggerAcquisitionMaxCount
和batchTriggerAcquisitionMaxCount
中的较小值,代码如下:
//idleWaitTime对应org.quartz.scheduler.idleWaitTime,这里now+idleWaitTime表示quartz
//可以获取的任务重最晚的触发时间,即允许任务被触发的时间在当前时间之后一段时间就
//能从数据库取出
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow())
org.quartz.scheduler.idleWaitTime
的另一个作用是决定主线程在一次操作过后的暂停时间,当然这个暂停时间会减去一个相关的随机值。这里可以很清晰的看到该值会影响主线程访问数据库的频率,官方文档也强调除非有特殊原因,比如大量的任务需要调度,否则不能把该值设置过低,官方建议不能低于5000ms。
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
private long getRandomizedIdleWaitTime() {
return idleWaitTime - random.nextInt(idleWaitVariablness);
}
最后推荐两篇文章,一篇是quartz的分布式原理
http://www.icartype.com/?p=140
一篇是quartz的性能优化
http://mp.weixin.qq.com/s?__biz=MzIxMjE0MjM4NA==&mid=402443938&idx=1&sn=77f72cbf29c691668c47c4b016bdcc60#rd