第9章 Java并发包中ScheduledThreadPoolExecutor原理探究

目录

类图结构

ScheduledThreadPoolExecutor时一个可以在指定一定延迟时间后或者定时进行任务调度执行的线程池。

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。

线程池队列是DelayedWorkQueue,与DelayedQueue一样属于延迟队列。

ScheduledFuturetask是具有返回值的任务,继承自FutureTask。FutureTask内部用一个变量state来表示任务的状态,一开始为NEW。

各状态意义如下:

private static final int NEW          = 0; // 初始状态
private static final int COMPLETING   = 1; // 执行中
private static final int NORMAL       = 2; // 正常运行结束
private static final int EXCEPTIONAL  = 3; // 运行中异常
private static final int CANCELLED    = 4; // 任务被取消
private static final int INTERRUPTING = 5; // 任务正在被中断
private static final int INTERRUPTED  = 6; // 任务已经被中断

ScheduledFutureTask内部用一个变量period来表示任务的类型:

  • period=0,说明当前任务是一次性的,执行完毕后就推出了。
  • period为负数,说明当前任务为固定延迟的定时可重复执行任务(执行完一次后会停止指定时间后再次运行,若每次执行任务耗时不同,则显然相邻两次任务执行间隔不同)。
  • period为正数,说明当前任务为固定频率的定尺可重复执行任务(也即固定周期)。

以下为ScheduledThreadPoolExecutor的构造函数:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue());
}

// 指定了线程工厂
public ScheduledThreadPoolExecutor(int corePoolSize,
                                    ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue(), threadFactory);
}

// 指定了拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
                                    RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue(), handler);
}

// 指定了线程工厂和拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
                                    ThreadFactory threadFactory,
                                    RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue(), threadFactory, handler);
}        

从上面的代码中可以看到,ScheduledThreadPoolExecutor的线程池队列为DelayedWorkQueue。

源码分析

schedule(Runnable command, long delay, TimeUnit unit)

提交一个延迟执行的任务,任务从提交时间算起延迟单位为unit的delay后开始执行。提交的任务不是周期性任务,任务只会执行一次。

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    // 参数校验
    if (command == null || unit == null)
        throw new NullPointerException();
    // 将任务包装成ScheduledFutureTask
    // triggerTime方法用来计算触发时间(即任务开始执行的绝对时间)
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
    // 添加任务到延迟队列
    delayedExecute(t);
    return t;
}

以下是ScheduledFutureTask的相关代码:

ScheduledFutureTask(Runnable r, V result, long ns) {
    // 调用父类构造函数
    super(r, result);
    this.time = ns; // 等待ns纳秒后开始执行
    this.period = 0; // period=0说明为一次性任务
    // 记录任务编号
    this.sequenceNumber = sequencer.getAndIncrement(); 
}

public FutureTask(Runnable runnable, V result) {
    // 将Runnable任务转化成Callable任务
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

delayedExecute的代码如下:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //  线程池关闭则执行拒绝策略
    if (isShutdown())
        reject(task);
    else {
        // 将任务添加到任务队列中
        // 任务队列为DelayedWorkQueue
        // 所加的task实现了comparable接口
        // 添加到任务队列中能保证队首元素为最早需要执行的
        super.getQueue().add(task);
        // 再次检查线程池是否关闭
        // 因为执行上面add代码过程中线程池完全有可能被关闭
        // 如果线程池被关闭则判断当前任务是否可以在当前状态下继续执行
        // 不能继续执行则移除当前任务
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 保证至少有一个线程存活可以从任务队列中获取任务处理任务
            ensurePrestart();
    }
}

// 判断任务是否是周期执行的
public boolean isPeriodic() {
    return period != 0;
}

// 根据periodic来决定isRunningOrShutdown的参数
// continueExistingPeriodicTasksAfterShutdown和
// executeExistingDelayedTasksAfterShutdown的值可通过相应的setter方法来设置
// 为true表示线程池关闭后当前任务会继续执行完毕
// 为false则取消当前任务
boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?
                                continueExistingPeriodicTasksAfterShutdown :
                                executeExistingDelayedTasksAfterShutdown);
}

// 确保至少有一个线程存活来执行任务
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

线程池中具体执行任务的是Worker,Worker通过调用run方法来执行。这里的任务是ScheduledFutureTask,下面来看其run方法。

// ScheduledFutureTask.run
public void run() {
    boolean periodic = isPeriodic();
    // 判断是否需要取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 一次性任务
    else if (!periodic)
        // 调用FutureTask的run方法
        ScheduledFutureTask.super.run();
    // 周期性任务
    // runAndReset为FutureTask中的方法
    // 用于执行当前任务但不改变future的状态
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下次执行的时间
        setNextRunTime();
        // 默认情况下,outerTask = this就是当前对象
        reExecutePeriodic(outerTask);
    }
}

// 设置周期性任务下次执行时间
private void setNextRunTime() {
    long p = period;
    // p > 0表示任务执行频率一定
    if (p > 0)
        // time为此次任务(已执行完毕)刚开始执行时的时间
        time += p;
    // p < 0表示任务固定延迟时间
    // 即此次任务完成后会等待-p时间再执行下次任务  
    else
        // 获取-p时间后的绝对时间
        time = triggerTime(-p);
}

// 周期性执行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        // 再次将task添加至任务队列中等待执行
        // 当轮到task执行时,又会在run中调用此方法
        // 再次将自身添加到任务队列中,从而达到周期性执行效果
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

当任务执行完毕后,让其延迟固定时间后再次运行。

// command:所要执行的任务
// initialDelay: 提交任务后等待多少时间再开始执行任务
// delay: 一次任务执行完后等待多少时间才执行下次任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                    long initialDelay,
                                                    long delay,
                                                    TimeUnit unit) {
    // 参数校验                                                      
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    // 将Runnable任务转换成ScheduledFutureTask
    // 第四个参数period为-delay < 0表示当前任务是固定延迟的
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                        null,
                                        triggerTime(initialDelay, unit),
                                        unit.toNanos(-delay));
    // decorateTask默认直接返回第二个参数,即sft                                     
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    // 默认情况下t = sft
    // outerTask即为sft自身,用于实现周期性执行
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

// 可被子类重写拓展
// 默认实现只是简单的返回task
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

主要不同在于设置了period=-delay,其他代码与schedule相同,相应的代码会判断period的取值从而决定程序不同的行为。

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

按固定频率周期性地执行任务。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                long initialDelay,
                                                long period,
                                                TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 关键在于此处第四个参数period > 0   
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                        null,
                                        triggerTime(initialDelay, unit),
                                        unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

除了设置period的值大于0外,总体与scheduleWithFixedDelay类似,不再赘述。

更多

相关笔记:《Java并发编程之美》阅读笔记

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容