一 服务端主体流程
- 任务触发时,需要进行执行器路由处理,并组装任务相关配置信息,如阻塞策略,分片参数,超时时间等。
二 表
2.1 XxlJobRegistry
- 执行器注册信息
类型,应用,执行器地址,心跳时间 - 任务信息
public class XxlJobInfo {
private int id; // 主键ID (JobKey.name)
private int jobGroup; // 执行器主键ID (JobKey.group)
private String jobCron; // 任务执行CRON表达式 【base on quartz】
private String jobDesc;
private Date addTime;
private Date updateTime;
private String author; // 负责人
private String alarmEmail; // 报警邮件
private String executorRouteStrategy; // 执行器路由策略
private String executorHandler; // 执行器,任务Handler名称
private String executorParam; // 执行器,任务参数
private String executorBlockStrategy; // 阻塞处理策略
private int executorTimeout; // 任务执行超时时间,单位秒
private int executorFailRetryCount; // 失败重试次数
private String glueType; // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum
private String glueSource; // GLUE源代码
private String glueRemark; // GLUE备注
private Date glueUpdatetime; // GLUE更新时间
private String childJobId; // 子任务ID,多个逗号分隔
// copy from quartz
private String jobStatus; // 任务状态 【base on quartz】
}
- 任务执行记录
public class XxlJobLog {
private int id;
// job info
private int jobGroup;//执行器主键id
private int jobId;
// execute info
private String executorAddress;//执行器地址
private String executorHandler;//执行器任务执行函数
private String executorParam;//参数
private String executorShardingParam;//分片参数
private int executorFailRetryCount;//失败重试次数
// trigger info
private Date triggerTime;//触发时间
private int triggerCode;//触发结果
private String triggerMsg;
// handle info
private Date handleTime;//处理完成时间
private int handleCode;//处理结果
private String handleMsg;
}
- 应用执行器信息
public class XxlJobGroup {
private int id;
private String appName;
private String title;
private int order;
private int addressType; // 执行器地址类型:0=自动注册、1=手动录入
private String addressList; // 执行器地址列表,多地址逗号分隔(手动录入)
}
三 任务触发
- quartz调度触发执行RemoteHttpBean.executeInternal
protected void executeInternal(JobExecutionContext context)
throws JobExecutionException {
// load jobId
JobKey jobKey = context.getTrigger().getJobKey();
Integer jobId = Integer.valueOf(jobKey.getName());
// trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
- JobTriggerPoolHelper使用线程池,每个任务触发一个线程执行
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
}
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
triggerPool.execute(new Runnable() {
@Override
public void run() {
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
}
});
}
- 初始化任务调度的信息
TriggerParam triggerParam = new TriggerParam();
//任务id
triggerParam.setJobId(jobInfo.getId());
//任务处理函数,参数
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
//阻塞策略
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
//任务执行超时时间配置
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
//任务触发记录id
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
//任务执行函数源码信息
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
//分片信息
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
简单任务按照执行器路由策略选择执行器
executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
xxlrpc发送任务触发消息
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
- 通知开始监控任务触发记录
JobFailMonitorHelper.monitor(jobLog.getId());
四 定时任务
- JobRegistryMonitorHelper执行器心跳扫描,定时扫描执行器心跳时间,删除过期的执行器
- JobFailMonitorHelper任务状态监控告警及失败重试
blockqueu存储所有本调度器待监控的任务,定时进行检查任务。
按照告警策略,进行失败重试或者发送告警。
任务执行中,则继续监控