Quartz 源码解析(二) —— Scheduler的初始化

初始化流程

最好结合上篇文章的例子来看,对应于使用Quartz的第一个和第二个步骤,创建StdSchedulerFactory和StdScheduler。
SchedulerFacotory,顾名思义,就是生产Scheduler实例的工厂类。下面的代码逻辑这么多,就是为了创建一个Scheduler实例。

初始化流程

源码解析

创建SchedulerFacotory

SchedulerFacotory是一个接口,它有两个实现:

StdSchedulerFacotory通过配置文件来设置Scheduler的各项参数
DirectSchedulerFactory主要通过硬编码来设置Scheduler的各项参数。

我们一般使用StdSchedulerFacotory,后者需要对Scheduler有很好的理解。quartz.properties里面的配置都对应到这个StdSchedulerFactory中,所以对某个配置不明白已经该配置的默认值可以看StdSchedulerFactory中获取配置的代码。

  • 构造方法
/**
 * 上篇的例子我们是通过这个构造方法创建实例,读取配置文件延迟到了getScheduler()方法
 */
public StdSchedulerFactory() {
}

/**
 * Create a StdSchedulerFactory that has been initialized via
 * <code>{@link #initialize(Properties)}</code>.
 *
 * @see #initialize(Properties)
 */
public StdSchedulerFactory(Properties props) throws SchedulerException {
    initialize(props);
}

/**
 * Create a StdSchedulerFactory that has been initialized via
 * <code>{@link #initialize(String)}</code>.
 *
 * @see #initialize(String)
 */
public StdSchedulerFactory(String fileName) throws SchedulerException {
    initialize(fileName);
}
  • getScheduler()方法
/**
 * <p>
 * 加载quartz.properties
 * 初始化StdScheduler
 * </p>
 */
public Scheduler getScheduler() throws SchedulerException {
  // 加载quartz.properties
    if (cfg == null) {
        initialize();
    }
    // 单例的SchedulerRepository实例
    SchedulerRepository schedRep = SchedulerRepository.getInstance();
    // 如果已经有类似的Scheduler启动了,就不用再创建了
    Scheduler sched = schedRep.lookup(getSchedulerName());

    if (sched != null) {
        if (sched.isShutdown()) {
            schedRep.remove(getSchedulerName());
        } else {
            return sched;
        }
    }
    // 初始化StdScheduler
    sched = instantiate();

    return sched;
}

创建Scheduler

  • instantiate()
    创建资源的逻辑基本都在这个方法里面了,方法很长,有接近800行,"..."表示此处减去了一些我认为对理解创建流程不是很重要的代码。
/**
 * 创建ClassLoadHelper,以便根据配置文件提供的类名去创建实例
 * 获取quartz.properties的配置值
 * 创建ClassLoadHelper,以便根据配置文件提供的类名去创建实例
 * 创建JobFactory
 * 根据PropertiesParser创建ThreadPool
 * 根据PropertiesParser创建JobStore
 * 根据PropertiesParser创建DataSource
 * 根据PropertiesParser创建SchedulerPlugin
 * 根据PropertiesParser创建Listeners
 * 根据PropertiesParser创建ThreadExecutor
 * 根据PropertiesParser创建JobRunShellFactory
 * 创建QuartzSchedulerResources
 *   ThreadPool.initialize();
 *   new QuartzScheduler();
 *   StdScheduler的所有方法都委托给了QuartzScheduler;
 * @return
 * @throws SchedulerException
 */
private Scheduler instantiate() throws SchedulerException {
    ...
    JobStore js = null;
    ThreadPool tp = null;
    QuartzScheduler qs = null;
    DBConnectionManager dbMgr = null;
    String instanceIdGeneratorClass = null;
    Properties tProps = null;
    String userTXLocation = null;
    boolean wrapJobInTx = false;
    boolean autoId = false;
    long idleWaitTime = -1;
    long dbFailureRetry = 15000L; // 15 secs
    String classLoadHelperClass;
    String jobFactoryClass;
    ThreadExecutor threadExecutor;

    SchedulerRepository schedRep = SchedulerRepository.getInstance();

    // 获取quartz.properties的配置值
    String schedName = cfg.getStringProperty(PROP_SCHED_INSTANCE_NAME,
            "QuartzScheduler");
    ...

    // 创建ClassLoadHelper,以便根据配置文件提供的类名去创建实例
    ClassLoadHelper loadHelper = null;
    try {
        loadHelper = (ClassLoadHelper) loadClass(classLoadHelperClass)
                .newInstance();
    } catch (Exception e) {
        throw new SchedulerConfigException(
                "Unable to instantiate class load helper class: "
                        + e.getMessage(), e);
    }
    loadHelper.initialize();

    // If Proxying to remote JMX scheduler, short-circuit here...
    // TODO:代理是做什么的 和 RemoteMBeanScheduler的使用
    if (jmxProxy) {
        ...
    }

    // 创建JobFactory
    JobFactory jobFactory = null;
    if(jobFactoryClass != null) {
        try {
            jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)
                    .newInstance();
        } catch (Exception e) {
            throw new SchedulerConfigException(
                    "Unable to instantiate JobFactory class: "
                            + e.getMessage(), e);
        }

        tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);
        try {
            setBeanProps(jobFactory, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("JobFactory class '"
                    + jobFactoryClass + "' props could not be configured.", e);
            throw initException;
        }
    }
    ...

    // 根据PropertiesParser创建ThreadPool
    String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

    if (tpClass == null) {
        initException = new SchedulerException("ThreadPool class not specified. ");
        throw initException;
    }

    try {
        tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
    } catch (Exception e) {
        initException = new SchedulerException("ThreadPool class '"
                + tpClass + "' could not be instantiated.", e);
        throw initException;
    }
    tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
    try {
        setBeanProps(tp, tProps);
    } catch (Exception e) {
        initException = new SchedulerException("ThreadPool class '"
                + tpClass + "' props could not be configured.", e);
        throw initException;
    }

    // 根据PropertiesParser创建JobStore
    String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
            RAMJobStore.class.getName());

    if (jsClass == null) {
        initException = new SchedulerException(
                "JobStore class not specified. ");
        throw initException;
    }

    try {
        js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
    } catch (Exception e) {
        initException = new SchedulerException("JobStore class '" + jsClass
                + "' could not be instantiated.", e);
        throw initException;
    }

    SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

    tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
    try {
        setBeanProps(js, tProps);
    } catch (Exception e) {
        initException = new SchedulerException("JobStore class '" + jsClass
                + "' props could not be configured.", e);
        throw initException;
    }

    // 持久化的JobStore处理逻辑
    if (js instanceof JobStoreSupport) {
        // Install custom lock handler (Semaphore)
        String lockHandlerClass = cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS);
        if (lockHandlerClass != null) {
            try {
                Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance();

                tProps = cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX, true);

                // If this lock handler requires the table prefix, add it to its properties.
                if (lockHandler instanceof TablePrefixAware) {
                    tProps.setProperty(
                            PROP_TABLE_PREFIX, ((JobStoreSupport)js).getTablePrefix());
                    tProps.setProperty(
                            PROP_SCHED_NAME, schedName);
                }

                try {
                    setBeanProps(lockHandler, tProps);
                } catch (Exception e) {
                    initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass
                            + "' props could not be configured.", e);
                    throw initException;
                }

                ((JobStoreSupport)js).setLockHandler(lockHandler);
                getLog().info("Using custom data access locking (synchronization): " + lockHandlerClass);
            } catch (Exception e) {
                initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass
                        + "' could not be instantiated.", e);
                throw initException;
            }
        }
    }

    // Set up any DataSources
    // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    // 根据PropertiesParser创建DataSource
    String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
    for (int i = 0; i < dsNames.length; i++) {
      ...
    }

    // 根据PropertiesParser创建SchedulerPlugin
    String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
    SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
    for (int i = 0; i < pluginNames.length; i++) {
      ...
    }

    // 根据PropertiesParser创建Listeners
    Class<?>[] strArg = new Class[] { String.class };
    String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
    JobListener[] jobListeners = new JobListener[jobListenerNames.length];
    for (int i = 0; i < jobListenerNames.length; i++) {
      ...
    }

    String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);
    TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];
    for (int i = 0; i < triggerListenerNames.length; i++) {
      ...
    }

    boolean tpInited = false;
    boolean qsInited = false;

    // 根据PropertiesParser创建ThreadExecutor
    String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
    if (threadExecutorClass != null) {
        tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
        try {
            threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();
            log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);

            setBeanProps(threadExecutor, tProps);
        } catch (Exception e) {
            initException = new SchedulerException(
                    "ThreadExecutor class '" + threadExecutorClass + "' could not be instantiated.", e);
            throw initException;
        }
    } else {
        log.info("Using default implementation for ThreadExecutor");
        threadExecutor = new DefaultThreadExecutor();
    }

    // Fire everything up
    // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    try {
        JobRunShellFactory jrsf = null; // Create correct run-shell factory...

        if (userTXLocation != null) {
            UserTransactionHelper.setUserTxLocation(userTXLocation);
        }

        if (wrapJobInTx) {
            jrsf = new JTAJobRunShellFactory();
        } else {
            jrsf = new JTAAnnotationAwareJobRunShellFactory();
        }

        if (autoId) {
            try {
              schedInstId = DEFAULT_INSTANCE_ID;
              if (js.isClustered()) {
                  schedInstId = instanceIdGenerator.generateInstanceId();
              }
            } catch (Exception e) {
                getLog().error("Couldn't generate instance Id!", e);
                throw new IllegalStateException("Cannot run without an instance id.");
            }
        }

        if (js.getClass().getName().startsWith("org.terracotta.quartz")) {
            try {
                String uuid = (String) js.getClass().getMethod("getUUID").invoke(js);
                if(schedInstId.equals(DEFAULT_INSTANCE_ID)) {
                    schedInstId = "TERRACOTTA_CLUSTERED,node=" + uuid;
                    if (jmxObjectName == null) {
                        jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
                    }
                } else if(jmxObjectName == null) {
                    jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId + ",node=" + uuid);
                }
            } catch(Exception e) {
                throw new RuntimeException("Problem obtaining node id from TerracottaJobStore.", e);
            }

            if(null == cfg.getStringProperty(PROP_SCHED_JMX_EXPORT)) {
                jmxExport = true;
            }
        }

        if (js instanceof JobStoreSupport) {
            JobStoreSupport jjs = (JobStoreSupport)js;
            jjs.setDbRetryInterval(dbFailureRetry);
            if(threadsInheritInitalizersClassLoader)
                jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);

            jjs.setThreadExecutor(threadExecutor);
        }

        // 创建QuartzSchedulerResources
        QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
        rsrcs.setName(schedName);
        rsrcs.setThreadName(threadName);
        rsrcs.setInstanceId(schedInstId);
        rsrcs.setJobRunShellFactory(jrsf);
        rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
        rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
        rsrcs.setBatchTimeWindow(batchTimeWindow);
        rsrcs.setMaxBatchSize(maxBatchSize);
        rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
        rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
        rsrcs.setJMXExport(jmxExport);
        rsrcs.setJMXObjectName(jmxObjectName);

        if (managementRESTServiceEnabled) {
            ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration();
            managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort);
            managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled);
            rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration);
        }

        if (rmiExport) {
            rsrcs.setRMIRegistryHost(rmiHost);
            rsrcs.setRMIRegistryPort(rmiPort);
            rsrcs.setRMIServerPort(rmiServerPort);
            rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry);
            rsrcs.setRMIBindName(rmiBindName);
        }

        SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);

        rsrcs.setThreadExecutor(threadExecutor);
        threadExecutor.initialize();

        rsrcs.setThreadPool(tp);
        if(tp instanceof SimpleThreadPool) {
            if(threadsInheritInitalizersClassLoader)
                ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
        }
        tp.initialize();
        tpInited = true;

        rsrcs.setJobStore(js);

        // add plugins
        for (int i = 0; i < plugins.length; i++) {
            rsrcs.addSchedulerPlugin(plugins[i]);
        }

        qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
        qsInited = true;

        // Create Scheduler ref...
        Scheduler scheduler = instantiate(rsrcs, qs);

        // set job factory if specified
        if(jobFactory != null) {
            qs.setJobFactory(jobFactory);
        }

        // Initialize plugins now that we have a Scheduler instance.
        for (int i = 0; i < plugins.length; i++) {
            plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
        }

        // add listeners
        for (int i = 0; i < jobListeners.length; i++) {
            qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
        }
        for (int i = 0; i < triggerListeners.length; i++) {
            qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
        }

        // set scheduler context data...
        for(Object key: schedCtxtProps.keySet()) {
            String val = schedCtxtProps.getProperty((String) key);    
            scheduler.getContext().put((String)key, val);
        }

        // fire up job store, and runshell factory

        js.setInstanceId(schedInstId);
        js.setInstanceName(schedName);
        js.setThreadPoolSize(tp.getPoolSize());
        js.initialize(loadHelper, qs.getSchedulerSignaler());

        jrsf.initialize(scheduler);

        qs.initialize();

        getLog().info(
                "Quartz scheduler '" + scheduler.getSchedulerName()
                        + "' initialized from " + propSrc);

        getLog().info("Quartz scheduler version: " + qs.getVersion());

        // prevents the repository from being garbage collected
        qs.addNoGCObject(schedRep);
        // prevents the db manager from being garbage collected
        if (dbMgr != null) {
            qs.addNoGCObject(dbMgr);
        }

        schedRep.bind(scheduler);
        return scheduler;
    }
    catch(SchedulerException e) {
        shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
        throw e;
    }
    catch(RuntimeException re) {
        shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
        throw re;
    }
    catch(Error re) {
        shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
        throw re;
    }
}

系列文章

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354

推荐阅读更多精彩内容

  • 概述 了解Quartz体系结构 Quartz对任务调度的领域问题进行了高度的抽象,提出了调度器、任务和触发器这3个...
    张晨辉Allen阅读 2,227评论 2 11
  • 导语:作为java领域最受欢迎的任务调度库之一,quartz为开发者提供了丰富的任务调度功能,比如让某段程序在每天...
    star24阅读 23,874评论 8 60
  • Quartz是一个完全由java编写的功能丰富的开源作业调度库,可以集成到几乎任何Java应用程序中,小到独立应用...
    ProteanBear阅读 7,060评论 3 24
  • 本文是根据 Quartz定时器官方文档翻译的,只翻译了第1到第10课,如有翻译不精确的地方,请读者指正,互相学习,...
    ChinaXieShuai阅读 8,345评论 1 19
  • 一切可以妥当的一定会妥当,准备迎接奇迹! 感恩 1、感恩妈妈的关心,深圳的台风又让她担心了!妈妈用远都无条件的爱着...
    belivePossible阅读 146评论 0 2