(一)任务的具象:Runnable、Callable、FutureTask
这三者可以用任务的生产阶段来区分
FutureTask是一个任务的产成品(一个完整的有生命周期的任务)
Runnable 、Callable 都是一个任务的半成品,最终要包装成一个FutureTask
(二)Callable 包装成 FutureTask
public class FutureTask<V> implements RunnableFuture<V> {
private Callable<V> callable;//半成品
private Object outcome;//返回结果
// 包装一下半成品Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
//最终run方法的实现
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用Callable.call()
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//返回结果赋值
outcome=result;
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
}
(三)Runnable先适配成Callable,再包装成 FutureTask
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Runnable runnable, V result) {
// 将Runnable适配成Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
}
// Runnable适配Callable的适配器
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
(四)任务异常会导致工作者线程退出吗?
1)FutureTask会消化掉所有异常
2)工作者线程不会由于任务出现异常而退出
public class FutureTask<V> implements RunnableFuture<V>{
// 执行任务
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
// 捕捉所有的异常
// 设置任务生命状态为 EXCEPTIONAL
// DONE!
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
}
(五)FutureTask 的生命周期
public class FutureTask<V> implements RunnableFuture<V> {
//当前生命状态
private volatile int state;
//所有生命周期的阶段
/*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
}
一些生命周期方法
public boolean isDone() {
return state != NEW;
}
public boolean isCancelled() {
return state >= CANCELLED;
}
/*
* 调用 Future.cancel(false):NEW -> CANCELLED
* 调用 Future.cancel(true):NEW -> INTERRUPTING -> INTERRUPTED
*/
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
//状态不为NEW时返回
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
// 状态必须为NEW才能继续执行
// ...
}
(五)构造可取消的任务
方法 1)通过捕捉异常而结束任务
1 捕捉线程中断异常:InterruptedException
2 捕捉其他检查异常:SocketExcepton、AsynchronousCloseException等
方法 2)轮询线程的中断状态
当任务没有阻塞方法并且在一个循环体里,可以通过轮询判断线程的中断状态while(!Thread.currentThread().isInterrupted()){...}
方法 3)轮询自定义的boolean标志
当任务没有阻塞方法并且在一个循环体里,可以通过轮询判断自定义的boolean标记