并发源码分析篇:
Condition类似于Object方法中的await和signal,其作用都是将线程挂起和唤醒。但是Condition更加灵活,可以通过newCondition方法获取多个等待队列。
下面这段代码通过Condition实现了一个生产者和消费者模式,同时这段代码也类似于ArrayBlockingQueue阻塞队列的实现,当数组满了,生产者将会阻塞,只有当消费者消费了消息后唤醒生产者生产者才会继续工作,同理消费者也是,当数组中没有数据的话将会被阻塞,直到数组中有数据才会被唤醒继续工作,Condition的使用必须结合Lock锁的使用,需要先获取到锁,否则会抛异常。
package com.ltf.study.concurennt;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo {
static Lock lock = new ReentrantLock();
static Condition fullConditon = lock.newCondition();
static Condition emptyCondition = lock.newCondition();
static String[] arr = new String[5];
static int takeindex;
static int putindex;
static int count;
public static void main(String[] args) {
for (int i = 0; i < 6; i++) {
provider(i+"");
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 10; i++) {
consumer();
}
}
public static void provider(String str) {
new Thread(()->{
try {
lock.lock();
while (count >= 5) {
fullConditon.await();
}
System.out.println(Thread.currentThread().getName()+"==生产了:"+str);
arr[putindex] = str;
count++;
putindex = ++putindex <5 ? putindex:0;
emptyCondition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"provider").start();
}
public static void consumer() {
new Thread(()->{
try {
lock.lock();
while (count == 0) {
emptyCondition.await();
}
System.out.println(Thread.currentThread().getName()+"消费:"+arr[takeindex]);
arr[takeindex] = null;
takeindex = ++takeindex <5 ? takeindex:0;
count--;
fullConditon.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"comsumer").start();
}
}
运行结果
provider==生产了:0
provider==生产了:2
provider==生产了:1
provider==生产了:3
provider==生产了:4
comsumer消费:0
comsumer消费:2
comsumer消费:1
provider==生产了:5
comsumer消费:3
comsumer消费:4
comsumer消费:5
锁的使用就不用多说了,可以看ReentrantLock源码分析
下面是等待队列的结构,为单向链表
当线程获取锁,调用await方法时,我们来看他做了什么
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 构建等待队列链表
Node node = addConditionWaiter();
// 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 阻塞当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
在这里首先会构建链表,然后释放锁,唤醒同步队列中的下一个节点去获取锁,然后阻塞当前线程。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
这里同样假设有A,B,C三个线程分别获取到锁调用await方法,就会构成如下队列并且阻塞,只有等到其他线程调用signal方法将节点加入到同步队列末尾等到前置节点将该节点唤醒才能继续在上次挂起的地方重新执行。
这时我们来看signal方法,该方法就是获取等待队列中的首节点传入doSignal方法中。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
doSignal方法做了以下几件事
- 如果当前首节点没有下一个节点,将尾节点置空
- 将当前节点的下一个节点引用置空,也就是从等待队列中移除了。
- 将当前移除的等待节waitState状态设置为0并且加入到同步队列末尾,并且将同步队列中对应的前置节点的waitState状态设置为-1。
signal做的事情其实就是将等待队列中的节点移到同步队列中
此时的同步队列和等待队列中的状态如下
A节点被移到了同步队列中,当被同步队列中的前置节点唤醒后,A节点继续会在上次挂起的地方运行。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
设置中断状态为了后面时直接抛异常还是重新中断,并且继续执行判断isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
此时的node waitState状态为0,并且前置节点不等于null,所以结果会返回true,所以跳出了while循环继续执行acquireQueued方法去抢占锁,继续抢占的原因是这是非公平锁,有可能被同步队列之外的其他线程获取到
至于sigalAll方法就是循环将每个节点移到同步队列中
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
到这里Condition的基本实现结束了。
总结
Condition我们可以这样理解,就是在同步队列之外多了其他的等待队列,wait方法就是将同步队列中的节点移到等待队列中,而signal方法则是将等待队列中的节点移动到同步队列中,从而达到线程之间的通信