Quartz之一:任务调度的动态处理

Quartz是一个完全由java编写的功能丰富的开源作业调度库,可以集成到几乎任何Java应用程序中,小到独立应用程序,大到大型的电子商务系统。Quartz可以用来创建执行数十,数百乃至数万个作业的简单或复杂的计划;作业的任务被定义为标准的Java组件,它可以执行几乎任何你可能编程的任务。而且Quartz Scheduler包含许多企业级功能,例如支持JTA事务和集群。

任务调度是很多系统都会用到的功能,比如需要定期执行调度生成报表、或者博客定时更新之类的,都可以靠Quartz来完成。而需求中经常会出现,需要对调度任务进行动态添加、关闭、启动和删除,在这里本文就对Spring+Quartz实现任务调度动态处理进行说明。

开篇就彩蛋:其实我已将Spring+Quartz动态任务调度封装为框架工具,不想看文章解释编码可以直接转到我的GitHub上的项目Libra

TL;DR

  • 简单了解一下Quartz中的概念和运行原理
  • 根据原理解析如何实现动态任务调度
  • 在Spring中配置Quartz框架,并自定义Quartz作业工厂类
  • 利用自定义注解实现动态任务类,并实现自动注入
  • 实现Dubbo环境下注解的自动注入
  • 附加:Libra如何来使用

理解Quartz的原理

先来直接看Quartz官网提供的单程序中使用示例:

  SchedulerFactory schedFact = new org.quartz.impl.StdSchedulerFactory();
  Scheduler scheduler = schedFact.getScheduler();

  // define the job and tie it to our HelloJob class
  JobDetail job = newJob(HelloJob.class)
      .withIdentity("myJob", "group1")
      .build();

  // Trigger the job to run now, and then every 40 seconds
  Trigger trigger = newTrigger()
      .withIdentity("myTrigger", "group1")
      .startNow()
      .withSchedule(simpleSchedule()
          .withIntervalInSeconds(40)
          .repeatForever())
      .build();

  // Tell quartz to schedule the job using our trigger
  scheduler.scheduleJob(job, trigger);
  scheduler.start();

通过示例就可以展开了解一下Quartz中涉及到的几个类概念:

  • SchedulerFactory:调度器工厂。这是一个接口,用于调度器的创建和管理。示例中使用的是Quartz中的默认实现。
  • Scheduler:任务调度器。它表示一个Quartz的独立运行容器,里面注册了多个触发器(Trigger)和任务实例(JobDetail)。两者分别通过各自的组(group)和名称(name)作为容器中定位某一对象的唯一依据,因此组和名称必须唯一(触发器和任务实例的组和名称可以相同,因为对象类型不同)。
  • Job:是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。
  • JobDetail:Job实例。Quartz在每次执行Job时,都重新创建一个Job实例,所以它不直接接受一个Job的实例,相反它接收一个Job实现类,以便运行时通过newInstance()的反射机制实例化Job。因此需要通过一个类来描述Job的实现类及其它相关的静态信息,如Job名字、描述、关联监听器等信息,JobDetail承担了这一角色。
  • Trigger:触发器,描述触发Job执行的时间触发规则。
    • SimpleTrigger:当仅需触发一次或者以固定时间间隔周期执行,SimpleTrigger是最适合的选择。
    • CronTrigger:通过Cron表达式定义出各种复杂时间规则的调度方案:如每早晨9:00执行,周一、周三、周五下午5:00执行等。

我们可以简单的理解到整个Quartz运行调度的流程如下:

  1. 通过触发器工厂(SchedulerFactory的实现类)创建一个调度器(Scheduler);
  2. 创建一个任务实例(JobDetail),为它指定实现了Job接口的实现类(示例中的HelloWord.class),并指定唯一标识(Identity)组(示例中的“group1”)和名称(示例中的“myJob”);
  3. 创建一个触发器(Trigger),为它指定时间触发规则(示例中的simpleSchedule()生成的SimpleTrigger),并指定唯一标识(Identity)组(示例中的“group1”)和名称(示例中的“myTrigger”)
  4. 最后通过调度器(Scheduler)将任务实例(JobDetail)和触发器(Trigger)绑定在一起,并通过start()方法开启任务调度。

Tips:当然Quartz还涉及到线程池等其他内容,你可以通过“Quartz原理揭秘和源码解读”或“Quartz原理解析”对Quartz的原理有更深入的了解,本篇文章就不详细展开了。

如何实现动态任务调度

了解了原理就可以开始构思如何实现动态任务调度了,其实从上面可以看出Quartz已经很好的支持了任务的动态创建、修改和删除,而我们的重点是如何抽象它以及想达到什么样的程度。

我们的需求是通过数据库(关系型或非关系型)来记录当前执行的任务处理,已经配置好的任务在系统启动时应该自动运行起来。另外,可以在系统运行时添加新的任务、对任务进行启动或停止、重新设置任务运行时间,还可以删除任务。

这里在系统运行时可以添加就涉及到两个不同的情况:

  1. 添加任务指定的实现类(实现了Quartz的Job接口)已经存在于项目中
  2. 添加任务指定的实现类(实现了Quartz的Job接口)不存在于项目中,需要让系统动态加载jar包

而对于实现类的处理也产生了两种方案:

  • 方案一:业务相关的实现类直接实现Job接口
  • 方案二:编写统一的实现类实现Job接口,在统一实现类中使用反射指定到某个类方法执行

这里基于脱离任务状态区分更好解耦方便拓展等方面考虑最终选择了方案二。

Tips:在后面可以看到,方案二更加灵活。

补充一下Quartz中的任务状态:

无状态Job:默认情况下都为此类型,可以并发执行。

有状态Job(StatefulJob):同一个实例(JobDetail)不能同时运行。在Quartz旧版本中实现StatefulJob接口,新版已经废弃了此接口,使用注解(@DisallowConcurrentExecution)实现。

这里使用方案二会创建一个无状态和一个有状态的统一任务实现类,但真正的业务任务类可以是一个。

确定好方案,便整理一下我们的具体实现思路如下:

  1. 实现两个统一的任务实现类(有状态和无状态),通过JobDataMap传递配置数据,根据数据通过反射运行指定的真正实现类中的方法。
  2. 因为要传递配置数据,则单独创建一个配置Bean用于传递。
  3. 而在解决如何收集业务实现类以提供选择的问题上,决定使用自定义注解+Spring提供的注解扫描方式。
  4. 要实现统一的任务实现类到真正的任务实现类的数据传递,并实现Spring环境下的自动注入。
  5. 系统启动时,需要一个初始化方法访问数据并生成配置项,将配置好的任务全部添加调度。
  6. 提供任务增、删、改、查以及启动停止等方法工具,便于数据接口对任务进行处理。

不过,首先我们想要将Quartz和Spring配置在一起。

Spring结合Quartz的配置

添加依赖

Spring和Quartz运行在各自的容器中,需要一定的配置才能将二者融合在一起。Spring对Quartz的支持放在context-support包中,因此项目除了引入Quartz还需引入spring-context-support,要是Maven项目需要在pom.xml中添加如下内容:

<!-- quartz -->
<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.0</version>
</dependency>

<!-- Spring -->
...
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>5.0.0.RELEASE</version>
</dependency>

Tips:版本看你自己,我这里是最新的5.0.0

添加配置

Spring的配置使用xml或者Configuration方式都可以。都是要设置以下几个内容:

  • 载入Quartz的属性配置文件quartz.properties
  • 设置BeanSchedulerFactoryBean将Spring和Quartz连接起来
  • 添加一个我们自定义的一个工厂类,这个类可以将Spring的上下文环境传导到Quartz中

我们先添加这个自定义工厂类,内容为:

/**
 * Custom JobFactory, so that Job can be added to the Spring Autowired (Autowired)
 *
 * @author ProteanBear
 */
public class AutowiringSpringBeanJobFactory extends AdaptableJobFactory
{
    /**
     * Spring Context
     */
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    /**
     * Create a Job instance, add Spring injection
     *
     * @param bundle A simple class (structure) used for returning execution-time data from the JobStore to the QuartzSchedulerThread
     * @return Job instance
     * @throws Exception throw from createJobInstance
     */
    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception
    {
        Object jobInstance=super.createJobInstance(bundle);
        //Manually execute Spring injection
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

这个类继承自AdaptableJobFactory(一个由Spring提供的工厂类),在构建它时它存在于Spring上下文中,我们自动注入Spring提供的AutowireCapableBeanFactory,通过它就可以使用代码在我们创建Job实例时进行自动注入处理。

这样就可以开始添加配置文件(xml)或者Configuration类,二者选择一个就好。

spring-quartz.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

    <!-- Load the configuration -->
    <bean id="quartzProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="locations">
            <list>
                <value>classpath*:quartz.properties</value>
            </list>
        </property>
        <property name="fileEncoding" value="UTF-8"></property>
    </bean>

    <!-- With a custom JobFactory, you can inject a Spring-related job into your job -->
    <bean id="jobFactory" class="com.github.proteanbear.libra.framework.AutowiringSpringBeanJobFactory">
    </bean>

    <!-- Task scheduling factory -->
    <bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="jobFactory" ref="jobFactory"/>
        <property name="overwriteExistingJobs" value="true"/>
        <property name="quartzProperties" ref="quartzProperties"/>
        <!-- After the application is started, start the task by 5 seconds delay -->
        <property name="startupDelay" value="5"/>
        <!-- Configure the spring context through the applicationContextSchedulerContextKey property -->
        <property name="applicationContextSchedulerContextKey">
            <value>applicationContext</value>
        </property>
    </bean>
</beans>
Configuration
/**
 * Spring integrated timing task framework quartz configuration
 *
 * @author ProteanBear
 */
@Configuration
public class LibraQuartzConfiguration
{
    /**
     * Load the configuration
     *
     * @return the configuration properties
     * @throws IOException The properties set is error.
     */
    @Bean
    public Properties quartzProperties() throws IOException
    {
        PropertiesFactoryBean propertiesFactoryBean=new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("quartz.properties"));
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    /**
     * With a custom JobFactory, you can inject a Spring-related job into your job.
     *
     * @param applicationContext the application context
     * @return the job factory
     */
    @Bean
    public JobFactory jobFactory(ApplicationContext applicationContext)
    {
        return new AutowiringSpringBeanJobFactory();
    }

    /**
     * Task scheduling factory
     *
     * @param jobFactory the job factory
     * @return the scheduler factory
     * @throws IOException The properties set is error.
     */
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory)
            throws IOException
    {
        SchedulerFactoryBean schedulerFactoryBean=new SchedulerFactoryBean();

        //configuration
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        //After the application is started, start the task by 5 seconds delay
        schedulerFactoryBean.setStartupDelay(5);
        //Job factory
        schedulerFactoryBean.setJobFactory(jobFactory);
        //load properties
        schedulerFactoryBean.setQuartzProperties(quartzProperties());
        //Configure the spring context through the applicationContextSchedulerContextKey property
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");

        return schedulerFactoryBean;
    }
}
quartz.properties
# Quartz配置
# 设置org.quartz.scheduler.skipUpdateCheck的属性为true来跳过更新检查
org.quartz.scheduler.skipUpdateCheck=true
# 线程池配置
org.quartz.threadPool.threadCount=8

Tips:属性配置我这里只配置了两项,欲知详情请参见【官方文档】。

使用xml的话,最后不要忘记载入配置文件,也有两种方式:

  • 在web.xml中,载入Spring配置(不是SpringMVC的地方)的地方,即<context-param>下的<param-value>中增加一项classpath*:spring-quartz.xml,当然具体位置看你放的位置。
  • 在Spring的配置xml中(如application-context.xml)添加<import resource="classpath*:spring-quartz.xml"/>

自定义注解与自动注入

Spring+Quartz就配置好了,下面才开始真正的编码开发。

创建两个统一的任务实现类

分别为有状态的QuartzJobDispatcherDisallow和无状态的QuartzJobDispatcher,但其实二者主要是注解的区别,因此添加一个抽象父类AbstractQuartzJobDispatcher来统一反射调用方法。

/**
 * Central task super class to implement general configuration information parsing
 * and execution methods
 *
 * @author ProteanBear
 */
public abstract class AbstractQuartzJobDispatcher
{
    /**
     * Get the log object
     *
     * @return the log object
     */
    abstract protected Logger getLogger();

    /**
     * Get Spring injection factory class
     *
     * @return Spring injection factory class
     */
    abstract protected AutowireCapableBeanFactory getCapableBeanFactory();

    /**
     * Invoke the specified method by reflection
     *
     * @param jobTaskBean The job config
     * @param jobDataMap the job data
     */
    protected void invokeJobMethod(JobTaskBean jobTaskBean,JobDataMap jobDataMap)
    {
        ...
    }

    /**
     * Calculate the running time
     *
     * @param startTime The start time
     * @param endTime   The end time
     * @return The running time description
     */
    protected String calculateRunTime(Date startTime,Date endTime)
    {
        ...
    }
}

Tips:两个抽象方法有子类实现,getLogger()可以让子类提供在日志中可以区分任务是否有状态;getCapableBeanFactory()将自动注入放入子类中,并获取后继续手动传递到真正反射指向的实现类中。

QuartzJobDispatcher继承抽象父类AbstractQuartzJobDispatcher实现接口JobQuartzJobDispatcherDisallow内容一样只是多了@DisallowConcurrentExecution注解),并实现execute方法。

/**
 * Generic stateless task,
 * which is responsible for the method of dispatching execution
 * through configuration information
 *
 * @author ProteanBear
 */
public class QuartzJobDispatcher extends AbstractQuartzJobDispatcher implements Job
{
    /**
     * Log
     */
    private static final Logger logger=LoggerFactory.getLogger(QuartzJobDispatcher.class);

    /**
     * Spring injection factory class
     */
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    /**
     * Get the log object
     *
     * @return the log object
     */
    @Override
    protected Logger getLogger(){return logger;}

    /**
     * Get Spring injection factory class
     *
     * @return Spring injection factory class
     */
    @Override
    protected AutowireCapableBeanFactory getCapableBeanFactory(){return capableBeanFactory;}

    /**
     * Actuator
     *
     * @param context the job execution context
     * @throws JobExecutionException the exception
     */
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException
    {
        ...
    }
}

方法里我们需要从JobDataMap中获取传递的数据,这个数据描述了实际要执行(即反射调用)的类的方法。

创建Bean描述任务类

JobTaskBean
属性 类型 说明
key String 任务标识
title String 任务显示名称
group String 任务组名
description String 任务描述
taskClass Class 任务对应的实现类
methodList List<Method> 任务实现类中需要执行的方法
fieldSetMethodMap Map<String,Method> 任务实现类属性设置方法,用于传递数据
concurrent boolean 任务是有状态还是无状态

两个任务类(有状态和无状态)中获取数据并调用抽象父类中的反射执行方法:

public void execute(JobExecutionContext context) throws JobExecutionException
{
    //Get recorded task configuration information
    JobTaskBean jobTaskBean=(JobTaskBean)context.getMergedJobDataMap().get(LibraKey.CONFIG.toString());

    //Execute the method specified by the configuration information
    invokeJobMethod(jobTaskBean,context.getMergedJobDataMap());
}

利用反射调用真正的实现类中的方法

invokeJobMethod方法主要是两部分,第一通过反射将JobDataMap中同名称的数据(通过fieldSetMethodMap)传递到指定的实现类(taskClass)中;第二是通过反射调用methodList中指定的方法:

protected void invokeJobMethod(JobTaskBean jobTaskBean,JobDataMap jobDataMap)
{
    //Job class
    Class jobClass=null;
    //Job method
    Object job=null;

    try
    {
        //Load config
        String name=jobTaskBean.getTitle();
        getLogger().info("Ready run job task for:"+jobTaskBean.toString());

        //Get the specified class and object
        jobClass=jobTaskBean.getTaskClass();
        job=jobClass.newInstance();
        if(job==null)
        {
            throw new Exception("Task【"+name+"】Task initialization error,dead start !");
        }
        //Spring autowire
        getCapableBeanFactory().autowireBean(job);

        //Pass the data
        Method setMethod=null;
        Object data=null;
        for(String field : jobTaskBean.getFieldSetMethodMap().keySet())
        {
            //Get the data
            data=jobDataMap.get(field);
            if(data==null) continue;
            setMethod=jobTaskBean.getFieldSetMethodMap().get(field);

            //Set the data
            try
            {
                setMethod.invoke(job,data);
            }
            catch(Exception ex)
            {
                ex.printStackTrace();
                getLogger().error(ex.getMessage());
            }
        }

        //Traverse execution of all annotation methods
        Method method=null;
        String methodName=null;
        for(int i=0, size=jobTaskBean.getMethodList().size();i<size;i++)
        {
            method=jobTaskBean.getMethodList().get(i);
            methodName=method.getName();

            //Invoke method
            getLogger().info("Start invoke job \""+name+"\"'s method:"+methodName);
            Date startTime=new Date();
            method.invoke(job);
            getLogger().info("Invoke method "+methodName+" of job "+name+" success.Use time "+calculateRunTime(
                startTime,new Date()));
        }
    }
    catch(Exception ex)
    {
        getLogger().error(ex.getMessage(),ex);
    }
}

通过自定义注解+Spring注解扫描,获取任务实现类

上面可以看到反射其实很简单,关键是我们怎么扫描到系统中的我们编写实现类呢?

这里就用到了Spring提供的自定义注解扫描机制。首先添加一个自定义注解来标注我们的任务实现类,就叫做JobTask。并在为JobTask注解指定Spring的@Component,这样Spring启动时就会扫描到这个注解,而我们就可以通过Spring的上下文applicationContextgetBeansWithAnnotation方法获取到标注了@JobTask注解的所有类。

我们再创建两个自定义注解(JobTaskExecuteJobTaskData)分别用来标注要执行的任务方法和需要传递进来的数据属性。这样在系统启动时运行初始化方法获取到Spring上下文中标注了@JobTask注解的所有类,并遍历它的属性和方法查找自定义注解,最终生成JobTaskBean任务类描述并缓存在HashMap中。

初始化及遍历查找注解生成任务类描述(来自JobTaskUtils
/**
 * Initialization
 */
private void init()
{
    logger.info("Start init job task!");

    if(this.applicationContext==null)
    {
        logger.error("ApplicationContext is null!");
        return;
    }

    //Initialization
    jobTaskMap=(jobTaskMap==null)?(new HashMap<>(20)):jobTaskMap;

    //Gets all the classes in the container with @JobTask annotations
    Map<String,Object> jobTaskBeanMap=this.applicationContext.getBeansWithAnnotation(JobTask.class);
    //Traverse all classes to generate task description records
    loadJobTaskBeans(jobTaskBeanMap);

    logger.info("Init job task success!");
}

/**
 * Load jobTaskBeans from map by the annotation
 *
 * @param jobTaskBeanMap the class map
 */
public void loadJobTaskBeans(Map<String,Object> jobTaskBeanMap)
{
    //Traverse all classes to generate task description records
    JobTask jobTaskAnnotation=null;
    JobTaskData jobTaskData=null;
    JobTaskExecute executeAnnotation=null;
    String key="";
    Method[] curMethods=null;
    Field[] curFields=null;
    Collection collection=jobTaskBeanMap.values();
    logger.info("Get class map at JobTask annotation by Spring,size is "+collection.size()+"!");
    for(Object object : collection)
    {
        //Get the class
        Class curClass=(object instanceof Class)
            ?((Class)object)
            :object.getClass();
        //Get annotation
        jobTaskAnnotation=(JobTask)curClass.getAnnotation(JobTask.class);
        if(jobTaskAnnotation==null) continue;

        //Get method annotation
        curMethods=curClass.getDeclaredMethods();
        List<Method> methodList=new ArrayList<>();
        for(int i=0, length=curMethods.length;i<length;i++)
        {
            Method curMethod=curMethods[i];
            executeAnnotation=curMethod.getAnnotation(JobTaskExecute.class);
            if(executeAnnotation!=null)
            {
                methodList.add(curMethod);
            }
        }
        //No method of operation, directly skip
        if(methodList.isEmpty()) continue;

        //Get field annotation @JobTaskData
        curFields=curClass.getDeclaredFields();
        Map<String,Method> fieldSetMethodMap=new HashMap<>();
        for(int i=0, length=curFields.length;i<length;i++)
        {
            Field curField=curFields[i];
            jobTaskData=curField.getAnnotation(JobTaskData.class);
            if(jobTaskData==null) continue;

            //Saved name
            String name=(StringUtils.isBlank(jobTaskData.value())?curField.getName():jobTaskData.value());
            //Get Field set method name
            String setMethodName="set"+curField.getName().substring(0,1).toUpperCase()+curField.getName()
                .substring(1);

            //Get set method
            Method setMethod=null;
            try
            {
                setMethod=curClass.getMethod(setMethodName,curField.getType());
            }
            catch(NoSuchMethodException e)
            {
                e.printStackTrace();
                logger.error(e.getMessage());
                continue;
            }
            if(setMethod==null) continue;

            //Put into map
            fieldSetMethodMap.put(name,setMethod);
        }

        //Generate a key
        key=("".equals(jobTaskAnnotation.value().trim()))?getDefaultTaskKey(curClass):jobTaskAnnotation.value();

        //Create a task description
        JobTaskBean jobTaskBean=new JobTaskBean(key,jobTaskAnnotation);
        //Set the current class
        jobTaskBean.setTaskClass(curClass);
        //Set all running methods
        jobTaskBean.setMethodList(methodList);
        //Set all field set method
        jobTaskBean.setFieldSetMethodMap(fieldSetMethodMap);
        //Set is jar class url
        jobTaskBean.setJarClassUrl(jarClassUrl(curClass));

        //Record
        jobTaskMap.put(key,jobTaskBean);

        logger.info("Record job task("+key+") for content("+jobTaskBean.toString()+")!");
    }
}

Tips:这个工具类(JobTaskUtils)实现了Spring的ApplicationContextAware这样可以方便获得ApplicationContext中的所有bean;并且这个类使用@Component注册为Spring组件,使用@Scope标注为单例。

创建工具类操作任务

接下来我们创建一个工具类,将对任务的操作代码封装起来。可以通过它设置任务启动、暂停、删除以及立即运行。在此之前先要增加一个Bean,用来描述任务的配置属性:

TaskConfigBean
属性 类型 说明
taskId String 作为任务的唯一标识,默认使用Java的UUID生成
taskKey String 任务实现类标识,即@JobTask注解中的value
taskStatus int 任务状态,0为禁用、1为启用;为0时会删除指定的任务
taskCron String Cron表达式,此字段不为空时使用Cron触发器,为空时为使用间隔触发器
jobDataMap JobDataMap 记录要传递到任务实现类中的全部数据
taskInterval Integer taskCron为空时有效,指定间隔触发器的间隔时间
taskIntervalType IntervalType taskCron为空时有效,指定间隔类型
taskIntervalRepeat Integer taskCron为空时有效,指定间隔重复频率

Tips:IntervalType是个枚举类型,包括SECOND(秒),MINUTE(分),HOUR(小时)。

然后Bean里需要实现一个生成当前任务时间配置的方法,即根据当前的设置生成一个时间生成器(Quartz里的ScheduleBuilder),具体内容如下:

public ScheduleBuilder scheduleBuilder() throws SchedulerException
{
    //Cron
    if(StringUtils.isNotBlank(taskCron))
    {
        return CronScheduleBuilder.cronSchedule(taskCron);
    }
    //Simple
    ScheduleBuilder result=null;
    if(taskInterval==null)
    {
        throw new SchedulerException("Timing settings must not be null!");
    }
    switch(taskIntervalType)
    {
        case SECOND:
            result=SimpleScheduleBuilder.simpleSchedule()
                    .withIntervalInSeconds(taskInterval)
                    .withRepeatCount(taskIntervalRepeat);
            break;
        case MINUTE:
            result=SimpleScheduleBuilder.simpleSchedule()
                    .withIntervalInMinutes(taskInterval)
                    .withRepeatCount(taskIntervalRepeat);
            break;
        case HOUR:
            result=SimpleScheduleBuilder.simpleSchedule()
                    .withIntervalInHours(taskInterval)
                    .withRepeatCount(taskIntervalRepeat);
    }
    return result;
}

然后就可以编写工具类中的设置任务方法了:

ScheduleJobUtils.java
public final void set(TaskConfigBean jobConfig) throws SchedulerException
{
    if(jobConfig==null)
    {
        throw new SchedulerException("Object jobConfig is null.");
    }

    //Get the corresponding task class record
    JobTaskBean jobTaskBean=jobTaskUtils.getJobTask(jobConfig.getTaskKey());
    if(jobTaskBean==null) return;

    //Read the parameters
    //Name for the task key + configuration task id
    String name=key(jobConfig.getTaskId(),jobConfig.getTaskKey());
    String group=jobTaskBean.getGroup();
    Integer status=jobConfig.getTaskStatus();
    boolean concurrent=jobTaskBean.isConcurrent();

    //Build the task
    Scheduler scheduler=schedulerFactoryBean.getScheduler();
    TriggerKey triggerKey=TriggerKey.triggerKey(name,group);
    Trigger trigger=scheduler.getTrigger(triggerKey);

    //The task already exists, then delete the task first
    if(trigger!=null)
    {
        logger.info("Delete job task(name:"+name+",group:"+group+")");
        scheduler.deleteJob(JobKey.jobKey(name,group));
    }

    //Create a task
    Class jobClass=concurrent?QuartzJobDispatcherDisallow.class:QuartzJobDispatcher.class;
    JobDetail jobDetail=JobBuilder.newJob(jobClass)
            .withIdentity(name,group).build();
    //Set the transmission data
    jobDetail.getJobDataMap().put(LibraKey.CONFIG.toString(),jobTaskBean);
    jobDetail.getJobDataMap().putAll(jobConfig.getJobDataMap());

    //Create a timer
    trigger=TriggerBuilder.newTrigger().withIdentity(name,group)
            .withSchedule(jobConfig.scheduleBuilder()).build();

    //Add tasks to the schedule
    scheduler.scheduleJob(jobDetail,trigger);
    logger.info("Add job task(name:"+name+",group:"+group+")");

    //Task is disabled, pause the job
    if(status==0) pauseJob(jobConfig.getTaskId(),jobConfig.getTaskKey());
}

private final String key(String taskId,String taskKey)
{
    return taskKey+"_"+taskId;
}

private final JobKey jobKey(String taskId,String taskKey) throws SchedulerException
{
    //Get the task properties
    JobTaskBean jobTaskBean=jobTaskUtils.getJobTask(taskKey);
    if(jobTaskBean==null)
    {
        throw new SchedulerException("No task class for key:"+taskKey);
    }

    return JobKey.jobKey(key(taskId,taskKey),jobTaskBean.getGroup()+"");
}

private final TriggerKey triggerKey(String taskId,String taskKey) throws SchedulerException
{
    //Get the task properties
    JobTaskBean jobTaskBean=jobTaskUtils.getJobTask(taskKey);
    if(jobTaskBean==null)
    {
        throw new SchedulerException("No task class for key:"+taskKey);
    }

    return TriggerKey.triggerKey(key(taskId,taskKey),jobTaskBean.getGroup()+"");
}

Tips:这里只展示了设置方法,以及Key生成相关的私有方法,诸如启动、暂停、删除等操作不过是生成Key,然后调用Quartz提供的方法处理就好了。

这样动态任务的框架封装完成,你可以自己设计数据库(关系型或者Key-value都可以),在Spring启动时读取数据然后生成TaskConfigBean初始化任务;并且为数据操作提供增删改查时调用ScheduleJobUtils对实际运行的任务进行处理。

Tips:启动时初始化可以使用Spring的@PostConstruct注解。

Tips:注意ScheduleJobUtils调用的Quartz中的resumeJobdeleteJob都是必须任务代码执行完成后才会暂停和删除。

Dubbo环境下的自动注入

前文看到Spring提供了非常方便的方法为Bean手动实现自动注入,只需注入AutowireCapableBeanFactory然后执行:

capableBeanFactory.autowireBean(job);

但是笔者在工作中使用时遇到了项目使用了Dubbo框架,也就是我们的任务调度服务中实现的任务处理类需要使用@Reference注解调用远程服务,所以需要解决的是这类服务类自动注入的问题。查询后发现Dubbo并未提供类似的方法(我们那时是Dubbo2,不知道Dubbo3会不会有)。那如何处理呢?!众所周知Dubbo是使用的RPC协议,并且是使用Zookeeper作为注册中心的由阿里提供的开源框架……所以说重点在于……开源啊,直接找源码来看看Dubbo自己是怎么注入的呢(吐!那说其他的一堆有啥用!)。经过一番努力和整理后,在统一任务类的父类(AbstractQuartzJobDispatcher)中添加如下注入方法:

/**
 * Instantiate Dubbo service annotation @Reference.
 *
 * @param bean the bean
 */
private void referenceBean(Object bean)
{
    //Set by the set method
    Method[] methods=bean.getClass().getMethods();
    for(Method method : methods)
    {
        String name=method.getName();
        if(name.length()>3 && name.startsWith("set")
                && method.getParameterTypes().length==1
                && Modifier.isPublic(method.getModifiers())
                && !Modifier.isStatic(method.getModifiers()))
        {
            try
            {
                Reference reference=method.getAnnotation(Reference.class);
                if(reference!=null)
                {
                    Object value=refer(reference,method.getParameterTypes()[0]);
                    if(value!=null)
                    {
                        method.invoke(bean,new Object[]{});
                    }
                }
            }
            catch(Throwable e)
            {
                getLogger().error("Failed to init remote service reference at method "+name+" in class "+bean
                        .getClass().getName()+", cause: "+e.getMessage(),e);
            }
        }
    }

    //Through the property settings
    Field[] fields=bean.getClass().getDeclaredFields();
    for(Field field : fields)
    {
        try
        {
            if(!field.isAccessible())
            {
                field.setAccessible(true);
            }

            Reference reference=field.getAnnotation(Reference.class);
            if(reference!=null)
            {
                //Refer method interested can see for themselves, involving zk and netty
                Object value=refer(reference,field.getType());
                if(value!=null)
                {
                    field.set(bean,value);
                }
            }
        }
        catch(Throwable e)
        {
            getLogger().error(
                    "Failed to init remote service reference at filed "+field.getName()+" in class "+bean.getClass()
                            .getName()+", cause: "+e.getMessage(),e);
        }
    }
}

/**
 * Instantiate the corresponding Dubbo service object.
 *
 * @param reference The annotation of Reference
 * @param referenceClass The class of Reference
 * @return
 */
private Object refer(Reference reference,Class<?> referenceClass)
{
    //Get the interface name
    String interfaceName;
    if(!"".equals(reference.interfaceName()))
    {
        interfaceName=reference.interfaceName();
    }
    else if(!void.class.equals(reference.interfaceClass()))
    {
        interfaceName=reference.interfaceClass().getName();
    }
    else if(referenceClass.isInterface())
    {
        interfaceName=referenceClass.getName();
    }
    else
    {
        throw new IllegalStateException(
                "The @Reference undefined interfaceClass or interfaceName, and the property type "
                        +referenceClass.getName()+" is not a interface.");
    }

    //Get service object
    String key=reference.group()+"/"+interfaceName+":"+reference.version();
    ReferenceBean<?> referenceConfig=referenceConfigs.get(key);
    //Configuration does not exist, find service
    if(referenceConfig==null)
    {
        referenceConfig=new ReferenceBean<Object>(reference);
        if(void.class.equals(reference.interfaceClass())
                && "".equals(reference.interfaceName())
                && referenceClass.isInterface())
        {
            referenceConfig.setInterface(referenceClass);
        }

        ApplicationContext applicationContext=getApplicationContext();
        if(applicationContext!=null)
        {
            referenceConfig.setApplicationContext(applicationContext);

            //registry
            if(reference.registry()!=null && reference.registry().length>0)
            {
                List<RegistryConfig> registryConfigs=new ArrayList<RegistryConfig>();
                for(String registryId : reference.registry())
                {
                    if(registryId!=null && registryId.length()>0)
                    {
                        registryConfigs
                                .add(applicationContext.getBean(registryId,RegistryConfig.class));
                    }
                }
                referenceConfig.setRegistries(registryConfigs);
            }

            //consumer
            if(reference.consumer()!=null && reference.consumer().length()>0)
            {
                referenceConfig.setConsumer(applicationContext.getBean(reference.consumer(),ConsumerConfig.class));
            }

            //monitor
            if(reference.monitor()!=null && reference.monitor().length()>0)
            {
                referenceConfig.setMonitor(
                        (MonitorConfig)applicationContext.getBean(reference.monitor(),MonitorConfig.class));
            }

            //application
            if(reference.application()!=null && reference.application().length()>0)
            {
                referenceConfig.setApplication((ApplicationConfig)applicationContext
                        .getBean(reference.application(),ApplicationConfig.class));
            }

            //module
            if(reference.module()!=null && reference.module().length()>0)
            {
                referenceConfig.setModule(applicationContext.getBean(reference.module(),ModuleConfig.class));
            }

            //consumer
            if(reference.consumer()!=null && reference.consumer().length()>0)
            {
                referenceConfig.setConsumer(
                        (ConsumerConfig)applicationContext.getBean(reference.consumer(),ConsumerConfig.class));
            }

            try
            {
                referenceConfig.afterPropertiesSet();
            }
            catch(RuntimeException e)
            {
                throw e;
            }
            catch(Exception e)
            {
                throw new IllegalStateException(e.getMessage(),e);
            }
        }

        //Configuration
        referenceConfigs.putIfAbsent(key,referenceConfig);
        referenceConfig=referenceConfigs.get(key);
    }
    return referenceConfig.get();
}

在反射方法(invokeJobMethod)中调用:

//Spring autowire
getCapableBeanFactory().autowireBean(job);
//Dubbo service injection
referenceBean(job);

经过如上的封装后,等于是在Quartz的基础上搭建了一层与具体业务脱离的动态任务处理层,它可以使用在任何有此需求的项目(因为基于Quartz所以对大型的分布式项目支持就一般)中,前面也说过我已经将它开源出来,取名为Libra。

关于Libra

Libra:这里的取的意思是天秤座,没啥深层含义就是取个名字。

开源项目地址Github-ProteanBear-Libra

项目最新版本v1.1.1

关于项目分支develop(开发)、master(主干)、libra-dubbo(带Dubbo支持)

关于项目结构

  • libra-test-dubbo-client:测试项目,Dubbo服务消费者;仅libra-dubbo分支。
    • src/main
      • java/com/github/proteanbear/test
        • TaskService.java:任务初始化
        • TestLongTimeRunTask.java:任务实现类
      • resources
        • application-context.xml:Spring配置
        • dubbo-consumer.xml:Dubbo服务消费者配置
        • log4j.properties:日志配置
        • quartz.properties:Quartz配置
      • webapp/WEB-INF
        • web.xml
    • pom.xml:Maven项目配置
  • libra-test-dubbo-common:测试项目,Dubbo服务接口定义;仅libra-dubbo分支。
    • src/main/java/com/github/proteanbear/test
      • DubboHelloService:Dubbo测试服务接口定义
    • pom.xml:Maven项目配置
  • libra-test-dubbo-service:测试项目,Dubbo服务提供者;仅libra-dubbo分支。
    • src/main
      • java/com/github/proteanbear/test
        • DubboHelloServiceImpl:Dubbo测试服务接口实现
      • resources
        • application-context.xml:Spring配置
        • log4j.properties:日志配置
        • spring-dubbo.xml:Dubbo服务提供者配置
      • webapp/WEB-INF
        • web.xml
    • pom.xml:Maven项目配置
  • libra-test-jar-scan:测试项目,用于生成通用的服务类以及任务实现类的Jar包;其他测试项目使用ScanUtils扫描工具动态加载Jar包内容并设置任务。
    • src/main/java/com/github/proteanbear/test
      • HelloService:Spring的Service,“Hello Libra!”
      • TestLongTimeRunTask.java:任务实现类
    • pom.xml:Maven项目配置
  • libra-test-jar-xml:测试项目,使用jar包载入依赖,使用xml配置。
    • src/main
      • java/com/github/proteanbear/test
        • TaskService:任务初始化
      • resources
        • application-context.xml:Spring配置
        • log4j.properties:日志配置
        • quartz.properties:Quartz配置
      • webapp/WEB-INF
        • web.xml
    • pom.xml:Maven项目配置
  • libra-test-maven-configuration:测试项目,使用Maven载入项目依赖,使用Configuration配置。
    • src/main
      • java/com/github/proteanbear/test
        • TaskService:任务初始化
      • resources
        • application-context.xml:Spring配置
        • log4j.properties:日志配置
        • quartz.properties:Quartz配置
      • webapp/WEB-INF
        • web.xml
    • pom.xml:Maven项目配置
  • libra-test-maven-xml:测试项目,使用Maven载入项目依赖,使用xml配置。
    • src/main
      • java/com/github/proteanbear/test
        • TaskService:任务初始化
      • resources
        • application-context.xml:Spring配置
        • log4j.properties:日志配置
        • quartz.properties:Quartz配置
      • webapp/WEB-INF
        • web.xml
    • pom.xml:Maven项目配置
  • libra:框架项目
    • src/main
      • java/com/github/proteanbear/libra
        • configuration
          • LibraQuartzConfiguration:Spring+Quartz的Configuration配置
        • framework
          • AbstractQuartzJobDispatcher:统一任务类的抽象父类,实现反射方法
          • AutowiringSpringBeanJobFactory:自定义Spring中的任务生成工厂
          • JobTask:自定义注解,标注任务实现类
          • JobTaskBean:任务实现类信息描述
          • JobTaskData:自定义注解,标注任务实现类数据传输属性
          • JobTaskExecute:自定义注解,标注任务执行方法
          • LibraKey:枚举,指定项目中的通用常量
          • QuartzJobDispatcher:统一任务类,无状态
          • QuartzJobDispatcherDisallow:统一任务类型,有状态
          • TaskConfigBean:任务配置描述
        • utils
          • JobTaskUtils:任务实现类管理工具,Spring组件、单例
          • ScanUtils:文件夹及Jar包动态扫描工具,Spring组件、单例
          • ScheduleJobUtils:任务管理工具,Spring组件
          • StringUtils:字符串处理工具,全静态方法
      • resources
        • spring-quartz.xml:Spring+Quartz的xml配置
    • pom.xml:Maven项目配置

关于两种配置

  • Spring+Quartz的xml配置引入,Spring配置xml中添加:
<import resource="classpath*:spring-quartz.xml"/>
  • Spring+Quartz的Configuration配置引入,Spring配置xml中添加:
<context:annotation-config />

Tips:注意一定要在使用项目中添加quartz.proterties文件。

Tips:SpringBoot默认支持Configuration方式。

关于动态载入

在v1.1.0版本中加入了ScanUtils支持Jar包的动态载入,使用scanUtils.scan(jarFile)就可以动态载入Jar包中的内容。

测试项目中libra-test-jar-scan就是用于生成Jar包的项目,测试项目libra-test-maven-xml中配置了Maven依赖libra-test-jar-scan所以可以直接调用项目libra-test-jar-scan中的任务实现类;而测试项目libra-test-maven-configuration未在Maven配置中依赖此项目,但是在初始化方法中通过在使用ScanUtils动态加载了Jar包,同样可以调用项目libra-test-jar-scan中的任务实现类。

Tips:在测试要自己去生成libra-test-jar-scan项目的Jar包,并在扫描前注意一下文件位置是否正确。

Quartz的动态任务调度就到这里了,这里主要还是集中于单机环境下,再有一章计划是探讨一下分布式任务调度。

文章有些长,如果是从头看到尾,那在这里真心感谢您的支持!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,801评论 6 342
  • 朋友说她们办公室有位女同事同龄人心机很重套路特别多,人前一套人后一套,明明坑了人还表现出一副在帮别人的样子。仗着做...
    森林_木阅读 369评论 1 3
  • 妈妈前几天去了东北,给姥姥上坟,留下我和我爸两人在家。妈妈不在家,活就变成了我和老爸的。可爸爸不舍得我干会...
    向日葵223344阅读 209评论 0 0