一、简介
JUC中的ReentrantLock给我们提供了方便的加锁解锁操作,但是我们有时候会需要有条件的对线程进行挂起和唤醒,此时另一个工具就排上了用场。下面是Condition变量的常用用法。
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread thread1 = new Thread(() -> {
try {
lock.lock();
System.out.println("我要等一个新信号");
condition.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("拿到一个信号!!");
lock.unlock();
}, "waitThread");
Thread thread2 = new Thread(() -> {
lock.lock();
System.out.println("我拿到锁了");
try {
Thread.sleep(3000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
condition.signal();
System.out.println("我发了一个信号!!");
lock.unlock();
}, "signalThread");
thread1.start();
thread2.start();
}
运行完之后的结果:
我要等一个新信号
我拿到锁了
我发了一个信号!!
拿到一个信号!!
线程1获取到锁之后调用Condition的await()方法,该方法会释放锁,并将当前线程挂起。随后线程2会拿到锁,并执行signal()方法,该方法会唤起线程1,并释放锁,然后线程1拿到锁,执行后续的流程。
所以说Condition是一个多线程间协调通信的工具,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。
那么,这些逻辑具体是怎么实现的呢?
首先,必须要明确,在线程调用await()或者signal()/signalAll()方法时,必须首先获取锁,否则会出现java.lang.IllegalMonitorStateException
异常。其次,Condition(其实是ConditionObject,Condition接口的实现)维护了一个所有等待Condition条件变量的线程的队列,每个线程构成一个Node结点,也就是AQS中的Node结点:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
二、await()
以Condition condition = lock.newCondition();
为例,newCondition()
会调用Sync的newCondition()
方法:
public Condition newCondition() {
return sync.newCondition();
}
Sync的newCondition()
方法:
final ConditionObject newCondition() {
return new ConditionObject();
}
//ConditionObject是AQS中的内部类,实例属性只有上面的firstWaiter和lastWaiter
public ConditionObject() { }
对于await()
方法,可以抛出中断异常,
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程封装成Node结点,添加到Condition的等待队列中
Node node = addConditionWaiter();
int savedState = fullyRelease(node); //释放所占的锁,因为在调用await()方法时,是占有锁的
int interruptMode = 0;
//释放了锁之后,判断当前线程的node结点是不是在syncQueue中,
//什么是syncQueue呢?就是获取锁不成功而被挂起的线程所在的那个队列
//如果不在syncQueue中,说明当前线程还不具备获取锁的资格,就将当前线程挂起,直到被添加到阻塞队列中,
while (!isOnSyncQueue(node)) {
LockSupport.park(this); //线程被挂起,等待被signal唤醒,此时可以直接跳到下面的signal方法解析
//挂起的过程中,如果被中断了,线程被唤醒,跳出while循环
//如果没被中断,则此时node已经在阻塞队列中了,也会跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//此时线程已经被唤醒,node结点已经被添加到阻塞队列中准备获取锁,被谁添加到阻塞队列了呢,是signal
//acquireQueued尝试获取锁,被中断返回true,否则返回false
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
其他版本的await()
方法和这个类似,读者可以自行尝试分析。
三.signal()
//ConditionObject
public final void signal() {
//如果在没有获取锁的情况下调用signal,会抛出IllegalMonitorStateException异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; //拿到队列中的第一个node,此队列是Condition队列
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null; //将第一个队列移出队列
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//该方法是将结点移到阻塞队列中,使得当前node节点的线程可以有资格获取锁
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//在加入阻塞队列之前,将node的waitStatus设置为0,如果失败,说明该节点已经被取消,
//返回false,此时上面的doSignal方法会继续遍历Condition队列,
//找到第一个还在等待Condition变量的结点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//将结点加入到阻塞队列,返回的p结点是node的前置节点
Node p = enq(node);
int ws = p.waitStatus;
//如果前一个节点已经被取消等待(ws>0),或者修改waitStatus失败,则直接唤醒。
//正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)这个判断
//是不会为true的,所以,不会在这个时候唤醒该线程。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true; //成功转移到阻塞队列,返回true
}
那么什么时候才会唤醒呢?当前线程调用完 signal()
之后,执行lock.unlock()
方法之后,释放锁的时候,会唤醒其他线程,这个就是unlock()
的逻辑了。
signalAll()
方法的逻辑与signal()
类似,读者可以自己分析一遍。
四.总结
现在我们再来理一遍:
一个线程Alock.lock()
获取锁成功之后,调用condition.await()
方法,该方法会首先将线程封装成node加入Condition队列,然后释放锁,将线程移出阻塞队列,然后挂起;另一个线程B因为之前线程释放锁,获取锁成功,调用signal()
方法,将Condition队列的第一个node转移到阻塞队列,这个时候还没完,执行完signal
之后会释放锁,此时会唤醒后面的线程,此时线程A有机会竞争到锁。