前言
JAVA通过多线程的方式实现并发,为了方便线程池的管理,JAVA采用线程池的方式对线线程的整个生命周期进行管理。1.5后引入的Executor框架的最大优点是把任务的提交和执行解耦。
要执行任务的人只需把Task描述清楚,然后提交即可。这个Task是怎么被执行的,被谁执行的,什么时候执行的,提交的人就不用关心了。
具体点讲,提交一个Callable对象给ExecutorService(如最常用的线程池ThreadPoolExecutor),将得到一个Future对象,调用Future对象的get方法等待执行结果就好了。
一个简单的例子
// 一个有5个作业线程的线程池,
//老大的老大找到一个管5个人的小团队的老大
ExecutorService executorService = Executors.newFixedThreadPool(5);
//提交作业给老大,作业内容封装在Callable中,约定好了输出的类型是String。
String outputs = executorService.submit(
new Callable<String>() {
public String call() throws Exception {
return "I am a task";
}
//提交后就等着结果吧,到底是手下5个作业中谁领到任务了,老大是不关心的。
}).get();
System.out.println(outputs);
使用上非常简单,通过一个工场类Executors创建了一个工作类,工场类返回一个ExecutorService对象。
执行过程
任务提交
ExecutorService是一个接口,没有具体实现,最后的具体实现应该由ThreadPoolExecutor实现的。
Executor 定义了一个execute接口,ExecutorService继承了Executor,并定义了管理线程生命周期的接口,可以接受提交任务、执行任务、关闭服务。
抽象类AbstractExecutorService 实现了ExecutorService接口,也实现了接口定义的默认行为;ThreadPoolExecutor继承了AbstractExecutorService。
AbstractExecutorService任务提交的submit方法有三个实现。
-
接收一个Runnable的Task,没有执行结果;
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
-
接收两个参数:一个任务,一个执行结果;
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
-
接收一个Callable,本身就包含执任务内容和执行结果。
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
submit方法的返回结果是Future类型,调用该接口定义的get方法即可获得执行结果。 V get() 方法的返回值类型V是在提交任务时就约定好了的。
分析
-
看AbstractExecutorService中submit(Callable<T> task),构造好一个FutureTask对象后,调用execute()方法执行任务。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
} -
Submit传入的参数都被封装成了FutureTask类型来execute的,对应前面三个不同的参数类型都会封装成FutureTask。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
} Executor接口中定义的void execute(Runnable command)方法的作用就是执行提交的任务,该方法在抽象类AbstractExecutorService的子类ThreadPoolExecutor中实现。
一个任务的执行过程
先补充下ThreadPoolExecutor有两个最重要的集合属性,分别是存储接收任务的任务队列和用来干活的作业集合。
//任务队列
private final BlockingQueue<Runnable> workQueue;
//作业线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
execute源码分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. 如果当前正在执行的Worker数量比corePoolSize(核心线程大小)要小。
* 直接创建一个新的Worker执行任务,会调用addWorker方法
*
* 2. 如果当前正在执行的Worker数量大于等于corePoolSize(核心线程大小)。
* 将任务放到阻塞队列里,如果阻塞队列没满并且状态是RUNNING的话,直接丢到阻塞队列,否则执行第3步。
* 丢到阻塞队列之后,还需要再做一次验证(丢到阻塞队列之后可能另外一个线程关闭了线程池或者刚刚加入到队列的线程死了)。
* 如果这个时候线程池不在RUNNING状态,把刚刚丢入队列的任务remove掉,调用reject方法,
* 否则查看Worker数量,如果Worker数量为0,起一个新的Worker去阻塞队列里拿任务执行
*
* 3. 丢到阻塞失败的话,会调用addWorker方法尝试起一个新的Worker去阻塞队列拿任务并执行任务,
* 如果这个新的Worker创建失败,调用reject方法
*/
int c = ctl.get();
// 第一个步骤,满足线程池中的线程大小比核心线程大小要小
if (workerCountOf(c) < corePoolSize) {
// addWorker方法第二个参数true表示使用基本大小
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二个步骤,线程池的线程大小比核心线程大小要大,
// 并且线程池还在RUNNING状态,阻塞队列也没满的情况,加阻塞队列里
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 虽然满足了第二个步骤,但是这个时候可能突然线程池关闭了,所以再做一层判断
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三个步骤,直接使用线程池最大大小。addWorker方法第二个参数false表示使用最大大小
else if (!addWorker(command, false))
reject(command);
}
在前面方法中都会调用addWorker(Runnable firstTask, boolean core)方法创建一个工作线程,差别是创建的有些工作线程上面关联接收到的任务firstTask,有些没有。该方法为当前接收到的任务firstTask创建Worker,并将Worker添加到作业集合HashSet<Worker> workers中,并启动作业。
addWorker源码分析
// 返回值是boolean类型,true表示新任务被接收了,并且执行了。否则是false
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 线程池当前状态
// 这个判断转换成 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)。
// 概括为3个条件:
// 1. 线程池不在RUNNING状态并且状态是STOP、TIDYING或TERMINATED中的任意一种状态
// 2. 线程池不在RUNNING状态,线程池接受了新的任务
// 3. 线程池不在RUNNING状态,阻塞队列为空。 满足这3个条件中的任意一个的话,拒绝执行任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 线程池线程个数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 如果线程池线程数量超过线程池最大容量或者线程数量超过了基本大小(core参数为true,core参数为false的话判断超过最大大小)
return false; // 超过直接返回false
if (compareAndIncrementWorkerCount(c)) // 没有超过各种大小的话,cas操作线程池线程数量+1,cas成功的话跳出循环
break retry;
c = ctl.get(); // 重新检查状态
if (runStateOf(c) != rs) // 如果状态改变了,重新循环操作
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 参数 firstTask != null, core = true
// 验证可以满足可新增线程的条件
boolean workerStarted = false; // 任务是否成功启动标识
boolean workerAdded = false; // 任务是否添加成功标识
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock; // 得到线程池的可重入锁
w = new Worker(firstTask); // 基于任务firstTask构造worker
final Thread t = w.thread;
if (t != null) {
mainLock.lock(); // 锁住,防止并发
try {
// 在锁住之后再重新检测一下状态
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 如果线程池在RUNNING状态或者线程池在SHUTDOWN状态并且任务是个null
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; // 标识一下任务已经添加成功
}
} finally {
mainLock.unlock(); // 解锁
}
if (workerAdded) {
t.start(); // 线程启动,调用worker.run
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 任务启动失败
addWorkerFailed(w);
}
return workerStarted;
}
任务执行
private boolean addWorker(Runnable firstTask, boolean core) {
...
...
if (workerAdded) {
t.start(); // 线程启动,调用worker.run
workerStarted = true;
}
...
return workerStarted;
}
Worker中的线程start的时候,调用Worker本身run方法
run()内部调用runWorker(Worker w),就是在一直调用getTask()方法获取任务,然后调用 runTask(task)方法执行任务。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
...
...
//循环从线程池的任务队列获取任务
while (task != null || (task = getTask()) != null) {
...
...
try {
//执行任务
task.run();
} catch (RuntimeException x) {
...
...
} finally {
//执行正常完成
afterExecute(task, thrown);
}
}finally {
//调用processWorkerExit方法进行回收
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法是ThreadPoolExecutor提供给其内部类Worker的的方法。作用就是一个,从任务队列中取任务,源源不断地输出任务。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
...
for (;;) {
...
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//从任务队列的头部取任务
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
如果发生了以下四件事中的任意一件,那么Worker需要被回收:
Worker个数比线程池最大大小要大
线程池处于STOP状态
线程池处于SHUTDOWN状态并且阻塞队列为空
使用超时时间从阻塞队列里拿数据,并且超时之后没有拿到数据(allowCoreThreadTimeOut || workerCount > corePoolSize)
如果getTask返回的是null,那说明阻塞队列已经没有任务了,那么会调用processWorkerExit方法进行回收:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 集合移除掉需要回收的Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//如果Worker不是异常而死亡结束流程
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//异常结束,新开启一个Worker
//开启条件:由于用户任务异常退出; Worker任务少于corePoolSize或者工作正在运行;阻塞队列不为空,但没有workers。
addWorker(null, false);
}
}