本文从源码角度,以AbstractQueuedSynchronizer.ConditionObject的await、signal、signalAll为入口,对基于ReentrantLock的线程间通信的原理进行介绍,其中涉及到的与ReentrantLock相关的方法,在ReentrantLock一文中已经介绍过,这里不再说明。
源码注释中同步队列指ReentrantLock维护的队列,条件队列指ConditionObject维护的队列,对于内部数据结构的组织形式,这里不进行介绍。
由于是多线程并发,所以会有多种不同的情况,源码中针对每个条件,仅给出一两种情形,留作参考。
类名简写说明:
- CO:ConditionObject
- AQS:AbstractQueuedSynchronizer
1. AQS.CO.await()
(1)AQS.CO.await()
public final void await() throws InterruptedException {
// 先判断当前线程是否被中断,被中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程对应的节点加入条件队列
Node node = addConditionWaiter();
// 完全释放锁,因为可重入,所以saveState可能大于1
int savedState = fullyRelease(node);
int interruptMode = 0;
// 若不在同步队列中,被唤醒后会再次进入while中被park
while (!isOnSyncQueue(node)) {
// 挂起当前线程
LockSupport.park(this);
// checkInterruptWhileWaiting中会判断当前线程是否是被中断唤醒的
// 返回值非0表示是被中断唤醒的,会通过break跳出while
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 条件1:
// 为true的情形:
// 当前线程在上面while中park时被中断,在acquireQueued中park时再次被中断时条件1为true
// 条件2:
// 条件2为true,即interruptMode是0或REINTERRUPT,为
// REINTERRUPT的情况详见下面(5)处,为0的情况尚不明确
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 从上面举的情形中来看,到这里时interruptMode就是REINTERRUPT,
// 这里难道是Doug Lea老先生的一个编码缺陷?
interruptMode = REINTERRUPT;
// 该条件成立的情形:
// 假设有三个线程thread0、thread1、thread2,三个线程依次调用AQS.CO.await(),之后三者均
// 在上面while中被park,此时,外部线程中断thread0,thread0会执行到这里,此时条件为true
if (node.nextWaiter != null)
// 断开条件队列中所有不是Node.CONDITION状态的节点(代码不再贴出)
unlinkCancelledWaiters();
// 该条件成立时,说明当前线程被中断,interruptMode为THROW_IE或REINTERRUPT
if (interruptMode != 0)
// 方法中会根据interruptMode的值做相应处理(代码不再贴出)
reportInterruptAfterWait(interruptMode);
}
(2)AQS.CO.addConditionWaiter
private Node addConditionWaiter() {
Node t = lastWaiter;
// 条件2:
// 为true时的情形:
// 情形1:假设有两个线程thread0、thread1,初始时,thread0在未持有锁的情况下调用
// AQS.CO.await(),当thread0执行到AQS.fullyRelease中时会将其对应节点
// 的waitStatus字段设置为取消状态(见下面(4)处),之后持有锁的thread1调用
// AQS.CO.await(),会执行到这里,这里t.waitStatus为1
// 情形2:假设有两个线程thread0、thread1,初始时,thread0持有锁,之后调用AQS.CO.await()
// 释放锁并阻塞在LockSupport.park(this)处,之后外部线程中断thread0,thread0被唤
// 醒后会执行到AQS.transferAfterCancelledWait里的if处将t.waitStatus设置为0,之
// 后thread1获取到锁,执行到这里时,t.waitStatus为0,条件2为true
if (t != null && t.waitStatus != Node.CONDITION) {
// 断开条件队列中所有不是Node.CONDITION状态的节点(代码不再贴出)
unlinkCancelledWaiters();
// 更新t。经过unlinkCancelledWaiters,lastWaiter可能已经改变
t = lastWaiter;
}
// 创建新的状态为Node.CONDITION的节点,并连接到同步队列中
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
(3)AQS.fullyRelease
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取AQS.state
// 因为锁可重入,所以saveState可能大于1
int savedState = getState();
// 传入saveState而不是1,因此该方法名叫"fully"Release
// 该方法在ReentrantLock的1.2(2)处已经说明,这里不再介绍
if (release(savedState)) {
failed = false;
// 返回saveState,再次唤醒该线程时会用到
return savedState;
} else { // 可以说不会执行到该分支
throw new IllegalMonitorStateException();
}
} finally {
// 当未持有锁的线程调用await方法时,release(savedState)中会抛异常,此时failed为true
if (failed)
// 设置为取消状态
node.waitStatus = Node.CANCELLED;
}
}
(4)AQS.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
// 一般情况下,初次进入该方法,条件1为true,该方法返回false,
// 下面对初次进入该方法时的一些特殊情形进行介绍:
// a.情形:
// 假设有两个线程thread0、thread1,初始时,thread0持有锁,调用了AQS.CO.await(),从
// while (!isOnSyncQueue(node))处到这里(还未执行),thread1获取到锁,调用了AQS.CO.signal,
// 一直调到AQS.transferForSignal,在方法里的第一个if执行后,会将node.waitStatus设置为0,
// 使得这里的条件1为false,因为thread1还未执行AQS.transferForSignal的Node p=enq(node),
// 因此这里node.prev为null,条件2为true。
// b.接着上面的情形继续延伸:
// 若thread0仍未执行该if语句,thread1执行完AQS.transferForSignal
// 的Node p=enq(node),则条件2变为false
// c.继续延伸:
// 若thread0仍未执行该if语句,线程thread2与来竞争锁,因为thread0此时仍持有锁,
// 因此,thread2会被加入到同步队列中,经过一些列步骤后,会被park,期间会将
// thread1对应节点的waitStatus设置为-1,条件1仍为false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false; // 返回false,表示node不在同步队列中
// 条件为false的情形:
// 上面a、b情形不变,将c处的情形改为:thread2执行到到AQS.addWaiter的pred.next = node;
// 处时(该语句还未执行),thread0执行到这里,此时该条件为false
// 条件为true的情形:
// thread0执行到这里时,thread2已经执行完AQS.addWaiter的pred.next = node;处,此时该条件成立
if (node.next != null) // If has successor, it must be on queue
return true;
// 到这里说明node.waitStatus不为Node.CONDITION且node.prev不为null
// findNodeFromTail中会从tail向前遍历同步队列,寻找node是否已在队列中,存在则返回true,否则返回false。
// 情形:
// 上面a情形不变,将b处的情形改为:thread1执行到AQS.enq的if (compareAndSetTail(t, node))
// (该语句还未执行),或执行了但由于其他线程的竞争导致执行失败了,findNodeFromTail
// 中会返回false;若执行了且成功了,则findNodeFromTail会返回true
// (多数情况下node都在同步队列的尾部附近)
return findNodeFromTail(node);
}
(5)AQS.CO.checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
// 被中断
// 返回THROW_IE表示当前线程在其他线程调用signal前被中断
// 返回REINTERRUPT表示当前线程在其他线程调用signal后被中断
// 具体的分界点就是node.waitStatus的值,若其值为Node.CONDITION,
// 则是signal前被中断,否则在signal后被中断
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
(6)AQS.transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
// 该条件为true的情形:
// 假设仅有thread0,thread0持有锁后调用AQS.CO.await()被park,
// 之后被外部线程中断,会执行到这里,此时条件为true
// 该条件为false的情形:
// 假设有两个线程thread0、thread1,thread0,thread0持有锁后调用AQS.CO.await()被park,thread1
// 获取到锁后调用AQS.CO.signal,之后会执行到AQS.transferForSignal的第一个if处,该if语句执行
// 完后,这里node.waitStatus被修改为0,所以thread0执行这里的if语句时会失败
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); // 入队
return true;
}
// 到这里说明其他线程调用了AQS.CO.signal,将当前线程对应的节点加入同步队列,这里自选等待入队完成
while (!isOnSyncQueue(node))
Thread.yield(); // 主动让出当前线程的CPU时间片
return false;
}
2. AQS.CO.signal
(1)AQS.CO.signal
public final void signal() {
// 持有锁的线程不是当前线程(即外部某个线程调用了该方法)则抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
(2)AQS.CO.doSignal
private void doSignal(Node first) {
// 遍历条件队列,仅唤醒第一个符合条件的线程
do {
// 更新firstWaiter
// firstWaiter为null,说明已到队列尾部
if ( (firstWaiter = first.nextWaiter) == null)
// 将lastWaiter设置为null
lastWaiter = null;
// 将first与其后继节点断开
first.nextWaiter = null;
// 条件1:只要一个线程入队成功,transferForSignal返回true,就会终止该循环
// 条件2:用于判断是否到队列尾部
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
(3)AQS.transferForSignal
final boolean transferForSignal(Node node) {
// 条件为true的情形(注意前面的"!"):
// 情形1:假设有两个线程thread0、thread1,初始时,thread0在未持有锁的情况下调用
// AQS.CO.await(),当thread0执行到AQS.fullyRelease中时会将其对应节点的
// waitStatus字段设置为取消状态(见上面1.(4)处),之后持有锁的thread1调用
// AQS.CO.signal就会执行到这里,此时条件为true
// 情形2:假设有两个线程thread0、thread1,初始时,持有锁的thread0调用AQS.CO.await(),
// 执行到该方法的while里被park,之后外部线程中断thread0,thread0被唤醒后会执
// 行到AQS.transferAfterCancelledWait的第一个if处将node.waitStatus设置为0,
// 之后thread1获取到锁,调用AQS.CO.signal执行到这里,条件就为true
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 入队
// 注意p是node的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// 条件1:
// 为true时的情形:
// 假设有三个线程thread0、thread1、thread2,初始时,持有锁的thread0调用AQS.CO.await(),执行到该方法
// 的while里被park,之后thread1获取到锁,调用AQS.CO.signal执行到上面Node p = enq(node);处(该行还未执行),
// 之后thread2通过ReentrantLock.lockInterruptibly获取锁,因为thread1仍持有锁,因此thread2最终会在
// AQS.parkAndCheckInterrupt中被park,之后外部线程中断thread2,当thread2执行完AQS.cancelAcquire的
// node.waitStatus = Node.CANCELLED;后,thread1从上面Node p = enq(node);处执行到这里,此时ws为1
// 条件2:
// 为true时的情形(注意前面的"!"):
// 对于条件1为true时的情形,在thread2被park后,若thread1先执行完ws > 0(条件2还未执行)(此时ws为0,
// 条件1不成立),此时thread2,被中断后在AQS.cancelAcquire的node.waitStatus = Node.CANCELLED;处
// 将p.waitStatus设置为1,此时条件2就为true
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// node.thread会从AQS.CO.await里的park处被唤醒,因为已经在同步队列中了,因此会跳出while,进
// 入AQS.acquireQueued中,若此时p前面仍有节点等待获取锁或p前面没有节点了但调用AQS.CO.signal
// 的线程仍未释放锁,node.thread在尝试几次后最终仍会在AQS.parkAndCheckInterrupt中被park
LockSupport.unpark(node.thread);
return true;
}
3. AQS.CO.signalAll
AQS.CO.signalAll与AQS.CO.signal类似,区别是signalAll会将所有节点加入同步队列,下面仅对AQS.CO.doSignalAll方法进行介绍:
private void doSignalAll(Node first) {
// 将条件队列的头节点和尾节点都置为null
lastWaiter = firstWaiter = null;
// 遍历条件队列,依次唤醒所有线程
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}