Java并发工具类的三板斧 状态,队列,CAS
状态:
/** * 当前任务的运行状态。 * * 可能存在的状态转换 * NEW -> COMPLETING -> NORMAL(有正常结果)
* NEW -> COMPLETING -> EXCEPTIONAL(结果为异常) * NEW -> CANCELLED(无结果) * NEW -> INTERRUPTING -> INTERRUPTED(无结果) */
private volatile int state; private static final int NEW = 0; //初始状态
private static final int COMPLETING = 1; //结果计算完成或响应中断到赋值给返回值之间的状态。
private static final int NORMAL = 2; //任务正常完成,结果被set
private static final int EXCEPTIONAL = 3; //任务抛出异常
private static final int CANCELLED = 4; //任务已被取消
private static final int INTERRUPTING = 5; //线程中断状态被设置ture,但线程未响应中断
private static final int INTERRUPTED = 6; //线程已被中断 //将要执行的任务
private Callable<V> callable; //用于get()返回的结果,也可能是用于get()方法抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes
private volatile Thread runner; //执行callable的线程,调用FutureTask.run()方法通过CAS设置
private volatile WaitNode waiters; //栈结构的等待队列,该节点是栈中的最顶层节点。
队列:
在FutureTask中,队列的实现是一个单向链表,它表示所有等待任务执行完毕的线程的集合,如果获取结果时,任务还没有执行完毕怎么办呢?那么获取结果的线程就会在一个等待队列中挂起,直到任务执行完毕被唤醒。
挂起的线程什么时候被唤醒?
1.任务执行完毕了,在finishCompletion方法中会唤醒所有在队列中等待的线程
2.等待的线程自身因为被中断等原因而被唤醒。
CAS:
使用CAS来完成入栈出栈操作。为啥要使用一个线程安全的栈呢,因为同一时刻可能有多个线程都在获取任务的执行结果,如果任务还在执行过程中,
则这些线程就要被包装成WaitNode扔到栈的栈顶,即完成入栈操作,这样就有可能出现多个线程同时入栈的情况,因此需要使用CAS操作保证入栈的线程安全,对于出栈的情况也是同理。
使用这个队列就只需要一个指向栈顶节点的指针就行了
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//如果状态小于COMPLETING 说明还没计算完成,则调用 awaitDone方法阻塞
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
// 如果该线程被中断,则从等待队列中移除该线程,直接抛异常
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
// 如果state >COMPLETING 说明任务已经执行完成,或者取消,如果等待队列不为空,将等待队列中该节点置为null,
// 返回state 然后就可以在外层方法调用report方法获取结果了
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
//如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快。
//yield 表示让出CPU执行权,等待下一次竞争。
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
// 如果q不为null,说明代表当前线程的WaitNode已经被创建出来了,如果queued=false,表示当前线程还没有入队 接下来利用CAS入队
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
//如果需要阻塞指定时间,则使用LockSupport.parkNanos阻塞指定时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//如果到指定时间还没执行完,则从队列中移除该节点,并返回当前状态
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
public void run() {
// 利用CAS设置当前执行线程,保证run方法只被执行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 真正执行任务,获取结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// 设置结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
// 先利用CAS将状态改为COMPLETING 保证只有一个线程能够执行 outcome = v;
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将结果赋值给 outcome
outcome = v;
// 再利用CAS将状态改为NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 将队列清空,然后唤醒队列中所有线程
finishCompletion();
}
}