ThreadPoolExecutor源码走读

ThreadPoolExecutor简介

关于线程池的简介我们可以点击这边文章线程池简介,这篇文章我们主要通过源码来看看线程池如何实现复用和如何回收空闲的线程。

ThreadPoolExecutor的类图


从类图中我们可以看出线程池的总体架构,而类图中的这些类主要是干什么的呢?Executor接口中将任务提交和任务执行解耦,ExecutorService和其各种功能强大的实现类提供了非常简便方式来提交任务并获取任务执行结果,封装了任务执行的全部过程。Worker这个类继承了AbstractQueuedSynchronizer,它可以用来判断线程是否空闲,同时他是任务的具体执行者,对于Executor
、ExecutorService和Worker的关系,我们可以做个形象的比喻。ExecutorService是产品经理它通过submit方法提交任务,这个任务被项目组长Executor领取,它通过execute方法来执行任务,但是项目组长真的会自己写代码吗?当然不会,只有苦逼的Worker,也就是我这种底层的程序猿来做接盘侠,执行run方法来撸代码。其中还有四个Policy结尾的内部类,这些类表示线程池满后无法接受任务时拒绝策略。

撸源码

ExecutorService提交任务

//返回Future对象,通过这个对象可以获得任务的执行结果
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    //将task包装成FutureTask
    //关于FutureTask请点击[FutureTask源码走读](//www.greatytc.com/p/c9a5c282cadf)
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //Executor领取任务
    execute(ftask);
    //返回执行结果
    return ftask;
}

Executor领取任务

//这里便是线程池接受任务的策略
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   
    int c = ctl.get();
    //小于核心线程数,便创建线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //超过核心线程数后便放到任务队列(任务队列是阻塞队列)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //任务队列满后再增加线程数到最大线程数,无法接受任务则使用拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
Worker执行任务
Worker类图
//属性
final Thread thread;
Runnable firstTask;
volatile long completedTasks;

从Worker的类图可以看出,Worker不仅执行任务同时它继承了AbstractQueuedSynchronizer,这说明他是一把锁(通过它tryAcquire方法可以看出它是一把简单的互斥锁),通过这把锁可以判断线程释放空闲。更多AQS请点击Java concurrent包源码走读(二)

添加Worker
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //判断线程状态和任务是否为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        //判断线程数
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        //创建一个Worker同时传入任务,其实是创建一个执行任务的线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {//线程状态判断
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //将worker放入HashSet<Worker>中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //worker创建成功变启动线程,启动后便会执行worker的run()方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //worker创建失败删除worker
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Worker的run()方法执行
public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 这一点没有看懂???
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {//获取任务
            w.lock();//上锁(使用的worker锁)
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //扩展点(执行线程前)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //扩展点(执行线程后)
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

//获取任务
// 如果发生了以下四件事中的任意一件,那么Worker需要被回收:
// 1. Worker个数比线程池最大大小要大
// 2. 线程池处于STOP状态
// 3. 线程池处于SHUTDOWN状态并且阻塞队列为空
// 4. 使用超时时间从阻塞队列里拿数据,并且超时之后没有拿到数据(allowCoreThreadTimeOut || workerCount > corePoolSize)
private Runnable getTask() {
    boolean timedOut = false;
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //判断线程池状态和任务队列
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //减少线程数目
            decrementWorkerCount();
            return null;
        }
        boolean timed;  
        for (;;) {
            //获取线程数目
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
        try {
            //获取任务,线程中断时获取阻塞队列的元素会响应中断
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
//processWorkerExit() worker推出处理
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果Worker没有正常结束流程调用processWorkerExit方法,worker数量减一。
    //如果是正常结束的话,在getTask方法里worker数量已经减一了
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 尝试结束线程池
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {// 如果线程池还处于RUNNING或者SHUTDOWN状态
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 新开一个Worker代替原先的Worker
        // 新开一个Worker需要满足以下3个条件中的任意一个:
        // 1. 用户执行的任务发生了异常
        // 2. Worker数量比线程池基本大小要小
        // 3. 阻塞队列不空但是没有任何Worker在工作
        addWorker(null, false);
    }
}

//中断线程池
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 满足3个条件中的任意一个,不终止线程池
        // 1. 线程池还在运行,不能终止
        // 2. 线程池处于TIDYING或TERMINATED状态,说明已经在关闭了,不允许继续处理
        // 3. 线程池处于SHUTDOWN状态并且阻塞队列不为空,这时候还需要处理阻塞队列的任务,不能终止线程池
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 走到这一步说明线程池已经不在运行,阻塞队列已经没有任务,但是还要回收正在工作的Worker
        if (workerCountOf(c) != 0) {
             // 由于线程池不运行了,调用了线程池的关闭方法,在解释线程池的关闭原理的时候会说道这个方法
            interruptIdleWorkers(ONLY_ONE); // 中断闲置Worker,直到回收全部的Worker。这里没有那么暴力,只中断一个,中断之后退出方法,中断了Worker之后,Worker会回收,然后还是会调用tryTerminate方法,如果还有闲置线程,那么继续中断
            return;
        }
        // 走到这里说明worker已经全部回收了,并且线程池已经不在运行,阻塞队列已经没有任务。可以准备结束线程池了
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); 
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // cas操作,将线程池状态改成TIDYING
                try {
                    terminated(); // 调用terminated方法
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0)); // terminated方法调用完毕之后,状态变为TERMINATED
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock(); 
        }
        // else retry on failed CAS
    }
}

线程关闭

平滑关闭shutdown
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); 
    try {
        checkShutdownAccess(); // 检查关闭线程池的权限
        advanceRunState(SHUTDOWN); // 把线程池状态更新到SHUTDOWN
        interruptIdleWorkers(); // 中断闲置的Worker
        onShutdown(); // 钩子方法,默认不处理。ScheduledThreadPoolExecutor会做一些处理
    } finally {
        mainLock.unlock(); 
    }
    tryTerminate(); // 尝试结束线程池
}


// 调用他的一个重载方法,传入了参数false,表示要中断所有的正在运行的闲置Worker,如果为true表示只打断一个闲置Worker
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); 
    try {
        for (Worker w : workers) { 
            Thread t = w.thread; // 拿到worker中的线程
            if (!t.isInterrupted() && w.tryLock()) { //尝试获取锁,若果获取成功说明空闲
                try {
                    t.interrupt();  // 中断Worker线程
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock(); // 释放Worker锁
                }
            }
            if (onlyOne) // 如果只打断1个Worker的话,直接break退出,否则,遍历所有的Worker
                break;
        }
    } finally {
        mainLock.unlock(); 
    }
}
暴力关闭shutdownNow
// shutdownNow方法会有返回值的,返回的是一个任务列表,而shutdown方法没有返回值
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); 
    try {
        checkShutdownAccess(); // 检查关闭线程池的权限
        advanceRunState(STOP); // 把线程池状态更新到STOP
        interruptWorkers(); // 中断Worker的运行
        tasks = drainQueue();
    } finally {
        mainLock.unlock(); 
    }
    tryTerminate(); // 尝试结束线程池
    return tasks;
}

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); 
    try {
        for (Worker w : workers)
            w.interruptIfStarted(); // 中断Worker的执行
    } finally {
        mainLock.unlock();
    }
}

void interruptIfStarted() {
   Thread t;
   // Worker无论是否被持有锁,只要还没被中断,那就中断Worker
   if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
       try {
           t.interrupt(); // 强行中断Worker的执行
       } catch (SecurityException ignore) {
       }
   }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容