ThreadPoolExecutor简介
关于线程池的简介我们可以点击这边文章线程池简介,这篇文章我们主要通过源码来看看线程池如何实现复用和如何回收空闲的线程。
ThreadPoolExecutor的类图
从类图中我们可以看出线程池的总体架构,而类图中的这些类主要是干什么的呢?Executor接口中将任务提交和任务执行解耦,ExecutorService和其各种功能强大的实现类提供了非常简便方式来提交任务并获取任务执行结果,封装了任务执行的全部过程。Worker这个类继承了AbstractQueuedSynchronizer,它可以用来判断线程是否空闲,同时他是任务的具体执行者,对于Executor
、ExecutorService和Worker的关系,我们可以做个形象的比喻。ExecutorService是产品经理它通过submit方法提交任务,这个任务被项目组长Executor领取,它通过execute方法来执行任务,但是项目组长真的会自己写代码吗?当然不会,只有苦逼的Worker,也就是我这种底层的程序猿来做接盘侠,执行run方法来撸代码。其中还有四个Policy结尾的内部类,这些类表示线程池满后无法接受任务时拒绝策略。
撸源码
ExecutorService提交任务
//返回Future对象,通过这个对象可以获得任务的执行结果
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
//将task包装成FutureTask
//关于FutureTask请点击[FutureTask源码走读](//www.greatytc.com/p/c9a5c282cadf)
RunnableFuture<Void> ftask = newTaskFor(task, null);
//Executor领取任务
execute(ftask);
//返回执行结果
return ftask;
}
Executor领取任务
//这里便是线程池接受任务的策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//小于核心线程数,便创建线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//超过核心线程数后便放到任务队列(任务队列是阻塞队列)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//任务队列满后再增加线程数到最大线程数,无法接受任务则使用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
Worker执行任务
Worker类图
//属性
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
从Worker的类图可以看出,Worker不仅执行任务同时它继承了AbstractQueuedSynchronizer,这说明他是一把锁(通过它tryAcquire方法可以看出它是一把简单的互斥锁),通过这把锁可以判断线程释放空闲。更多AQS请点击Java concurrent包源码走读(二)
添加Worker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//判断线程状态和任务是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//判断线程数
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
//创建一个Worker同时传入任务,其实是创建一个执行任务的线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {//线程状态判断
if (t.isAlive())
throw new IllegalThreadStateException();
//将worker放入HashSet<Worker>中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//worker创建成功变启动线程,启动后便会执行worker的run()方法
t.start();
workerStarted = true;
}
}
} finally {
//worker创建失败删除worker
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker的run()方法执行
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 这一点没有看懂???
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//获取任务
w.lock();//上锁(使用的worker锁)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//扩展点(执行线程前)
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//扩展点(执行线程后)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//获取任务
// 如果发生了以下四件事中的任意一件,那么Worker需要被回收:
// 1. Worker个数比线程池最大大小要大
// 2. 线程池处于STOP状态
// 3. 线程池处于SHUTDOWN状态并且阻塞队列为空
// 4. 使用超时时间从阻塞队列里拿数据,并且超时之后没有拿到数据(allowCoreThreadTimeOut || workerCount > corePoolSize)
private Runnable getTask() {
boolean timedOut = false;
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//判断线程池状态和任务队列
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//减少线程数目
decrementWorkerCount();
return null;
}
boolean timed;
for (;;) {
//获取线程数目
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//获取任务,线程中断时获取阻塞队列的元素会响应中断
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
//processWorkerExit() worker推出处理
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果Worker没有正常结束流程调用processWorkerExit方法,worker数量减一。
//如果是正常结束的话,在getTask方法里worker数量已经减一了
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {// 如果线程池还处于RUNNING或者SHUTDOWN状态
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 新开一个Worker代替原先的Worker
// 新开一个Worker需要满足以下3个条件中的任意一个:
// 1. 用户执行的任务发生了异常
// 2. Worker数量比线程池基本大小要小
// 3. 阻塞队列不空但是没有任何Worker在工作
addWorker(null, false);
}
}
//中断线程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 满足3个条件中的任意一个,不终止线程池
// 1. 线程池还在运行,不能终止
// 2. 线程池处于TIDYING或TERMINATED状态,说明已经在关闭了,不允许继续处理
// 3. 线程池处于SHUTDOWN状态并且阻塞队列不为空,这时候还需要处理阻塞队列的任务,不能终止线程池
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 走到这一步说明线程池已经不在运行,阻塞队列已经没有任务,但是还要回收正在工作的Worker
if (workerCountOf(c) != 0) {
// 由于线程池不运行了,调用了线程池的关闭方法,在解释线程池的关闭原理的时候会说道这个方法
interruptIdleWorkers(ONLY_ONE); // 中断闲置Worker,直到回收全部的Worker。这里没有那么暴力,只中断一个,中断之后退出方法,中断了Worker之后,Worker会回收,然后还是会调用tryTerminate方法,如果还有闲置线程,那么继续中断
return;
}
// 走到这里说明worker已经全部回收了,并且线程池已经不在运行,阻塞队列已经没有任务。可以准备结束线程池了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // cas操作,将线程池状态改成TIDYING
try {
terminated(); // 调用terminated方法
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // terminated方法调用完毕之后,状态变为TERMINATED
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
线程关闭
平滑关闭shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查关闭线程池的权限
advanceRunState(SHUTDOWN); // 把线程池状态更新到SHUTDOWN
interruptIdleWorkers(); // 中断闲置的Worker
onShutdown(); // 钩子方法,默认不处理。ScheduledThreadPoolExecutor会做一些处理
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试结束线程池
}
// 调用他的一个重载方法,传入了参数false,表示要中断所有的正在运行的闲置Worker,如果为true表示只打断一个闲置Worker
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread; // 拿到worker中的线程
if (!t.isInterrupted() && w.tryLock()) { //尝试获取锁,若果获取成功说明空闲
try {
t.interrupt(); // 中断Worker线程
} catch (SecurityException ignore) {
} finally {
w.unlock(); // 释放Worker锁
}
}
if (onlyOne) // 如果只打断1个Worker的话,直接break退出,否则,遍历所有的Worker
break;
}
} finally {
mainLock.unlock();
}
}
暴力关闭shutdownNow
// shutdownNow方法会有返回值的,返回的是一个任务列表,而shutdown方法没有返回值
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查关闭线程池的权限
advanceRunState(STOP); // 把线程池状态更新到STOP
interruptWorkers(); // 中断Worker的运行
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试结束线程池
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted(); // 中断Worker的执行
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
// Worker无论是否被持有锁,只要还没被中断,那就中断Worker
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt(); // 强行中断Worker的执行
} catch (SecurityException ignore) {
}
}
}