Java8 源码阅读 - AbstractQueuedSynchronizer
Java8 源码阅读 - AQS之Condition
Java AQS作为JUC各种锁的核心类,承担了制定和管理线程状态的职责,通过阅读AQS的代码可以学习大牛处理并发的思想,毕竟语言只是思想的一种体现,学会了思想自然而然也能触类旁通;
特性
AQS被设计成大多数同步器(Synchronizer)的基类,提供一个实现阻塞锁和相关同步器(信号量、事件等),核心依赖于一个FIFO的等待队列和一个原子性的状态state
;AQS提供了相关模版方法供子类来决定如何来获取和释放这个状态,比如ReentrantLock
中的内部类Sync和CountDownLatch
的内部类Sync;
AQS分为两种模式,共享模式(Shared Mode)和独占模式(Exclusive Mode),当以独占模式占用锁时,其他线程将尝试获取锁时会失败,而在共享模式下是允许多个线程占用锁;
AQS是一个通过内置的FIFO双向队列来完成线程的排队工作,等待队列是类CLH锁队列;
当一个线程需要获取锁时,会创建一个新的节点包含该线程的信息和lock状态,然后使自己添加到队列的尾部,然后不停的轮询前任节点是否释放锁直到获取锁成功;当每个线程释放锁的时候,将当前线程出队,这就是CLH锁队列;
和其他CLH锁队列一样使用的是通用的逻辑,每个节点中的status字段跟踪线程信息,和其他CLH队列不同的是,类CLH锁队列会去阻塞等待获取锁的队列而不是自旋,队列的每个节点作为一个特定通知样式的监视器,它持有一个等待线程;
源码细节
static final class Node {
/** 标记是共享模式的节点 */
static final Node SHARED = new Node();
/** 标记是排他模式的节点 */
static final Node EXCLUSIVE = null;
/** 表示线程因为中断或者等待超时,需要从等待队列中取消等待;*/
static final int CANCELLED = 1;
/** 此节点的后续节点被(或将很快被)阻塞(通过park),因此当前节点在释放或取消时必须取消其后续节点。
* 为了避免争用,获取方法必须首先表明它们需要一个信号,然后重试原子获取,如果失败则阻塞。*/
static final int SIGNAL = -1;
/** 此节点当前位于条件队列上。在传输之前,它不会被用作同步队列节点,此时状态将被设置为0 */
static final int CONDITION = -2;
/** 一个被释放的节点应该被传播到其他节点。这是在doReleaseShared中设置的(仅针对head节点),
* 以确保传播能够继续,即使其他操作已经介入 */
static final int PROPAGATE = -3;
// 可以是0 或者上面的状态
volatile int waitStatus;
/** 通过prev可以找到该节点的前置节点 */
volatile Node prev;
/** 原本CLH队列是不维护next节点的,这里维护next节点是为了能过唤醒下一个被阻塞的节点 */
volatile Node next;
/** Node结点内部的SHARED表示的线程是因为获取共享资源失败被阻塞添加到队列中的线程,
* Node中的EXCLUSIVE表示的线程是获取独占资源失败被阻塞添加到队列中的线程 */
volatile Thread thread;
/** 下一个等待的节点 值通常是SHARED或者EXCLUSIVE */
Node nextWaiter;
/** 如果节点在共享模式下等待,则返回true。*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/** 返回前一个节点,如果为空则抛出NullPointerException。 */
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {} //默认是SHARED节点
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
CLH锁队列的节点,无论是共享模式还是排他模式都是使用这个节点,内部维护一个waitStatus
来表示当前锁的状态;
/** 等待队列的头,会延迟初始化。除了初始化之外,
* 它只通过setHead方法进行修改。注意:如果head存在,会保证节点的waitStatus不会是CANCELLED。*/
private transient volatile Node head;
/** 等待队列的尾部,会延迟初始化。仅通过方法enq修改,所有等待队列都是添加到尾部。*/
private transient volatile Node tail;
/** volatile修饰的状态信息state */
private volatile int state;
CLH队列需要一个虚构的头节点head
,只有在第一次出现锁竞争的时候才会初始化创建,可以通过head节点找到第一个排队的线程,当解锁时也只需要移动head就可以将线程出队;在入队的时候只需要将tail
节点和新增节点替换同时将tail指向新增节点即可,整体的CLH结构如下图一样,我们可以通过代码来论证这点;
排他模式&ReentrantLock
非公平锁
AQS的排他模式下的使用案例是ReentrantLock
的NonfairSync
;
static final class NonfairSync extends Sync {
...
final void lock() {
//如果通过cas设置state
if (compareAndSetState(0, 1))
//加锁成功
setExclusiveOwnerThread(Thread.currentThread());
else
//加锁失败 出现了锁竞争 进入等待队列
acquire(1);
}
//排他模式尝试获取资源,成功则返回true,失败则返回false
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
非公平锁加锁的入口,通过CAS尝试上锁,如果出现加锁失败,则进入下面的流程;
public final void acquire(int arg) {
if (!tryAcquire(arg) && //如果该线程获取锁失败
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//tryAcquire进入到这里
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 如果当前同步状态为0 意味着没有线程占用锁 就是锁可能刚被释放了
if (compareAndSetState(0, acquires)) { // cas尝试获取锁
setExclusiveOwnerThread(current); // 获取锁成功
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果当前线程已经占用锁了
int nextc = c + acquires;
if (nextc < 0) // 溢出处理
throw new Error("Maximum lock count exceeded");
setState(nextc); // 每重入一次便往state+1
return true;
}
return false; //获取锁失败
}
执行acquire(1)
就意味着锁已经被占用了,首先会在nonfairTryAcquire
重新获取同步器状态state,因为有可能在这时锁被其他线程释放了,获取获取锁成功则直接返回;否则会判断是否是当前线程占用的锁,这部分是重入锁的判断,如果是当前线程会更新state状态值,否则获取锁失败进入到下面的流程;
private Node addWaiter(Node mode) {
// 这里的mode是Node.EXCLUSIVE也就是null
Node node = new Node(Thread.currentThread(), mode); // 注意这个Node节点保存的nextWaiter是null
Node pred = tail;
if (pred != null) { // 尝试快速插入;这一步是快速将节点插入队列尾部
node.prev = pred; // 将构造后的node结点的前驱结点设置为tail
if (compareAndSetTail(pred, node)) { // CAS设置当前的node结点为tail结点
pred.next = node;
//入队成功 返回
return node;
}
}
// 入队失败,走到这里 有两种情况
// 1. 当前的队列为null 未被初始化; 2. 上面cas快速入队失败;
enq(node);
return node;
}
private Node enq(final Node node) {
// 自旋直到入队成功
for (;;) {
Node t = tail;
if (t == null) { // 意味着当前队列为空
if (compareAndSetHead(new Node())) // 建立个空的节点当head
tail = head;
} else {
node.prev = t; // 将新节点的前驱节点设置为tail节点
if (compareAndSetTail(t, node)) { // 通过CAS将node设置为tail
// 如果cas失败,意味着tail节点已经被其他节点所占用,重新循环添加节点
t.next = node;
return t;
}
}
}
}
addWaiter
中会将当前线程包装成一个Node节点然后添加到CLH锁队列的末尾,在排他模式下Node节点的nextWaiter是null;
final boolean acquireQueued(final Node node, int arg) {
// node节点是排队到队尾的节点
boolean failed = true;
try {
boolean interrupted = false;
// 每个需要获取锁的线程都需要自旋等待锁资源
for (;;) {
// 获取node的前置节点
final Node p = node.predecessor();
// 如果前置节点是头节点 则尝试获取锁 又回到上面的nonfairTryAcquire方法
if (p == head && tryAcquire(arg)) {
// 该线程获取锁成功 将node节点(p的下一任节点)设置为head
// setHead里面将node的线程信息清除 因为head只做哨兵作用
setHead(node);
// 将p节点出队 释放p节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果p不是头节点 或者 该线程获取锁失败
if (shouldParkAfterFailedAcquire(p, node) && // 若p节点的状态为SIGNAL,则表示p节点允许被阻塞
parkAndCheckInterrupt()) // 如果该线程被阻塞且线程状态为中断
// 将中断标志置为true 这是lock方法处理中断的方式
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 判断该线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前置节点p的status
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前置节点p的状态为SIGNAL,即表示需要将线程阻塞
return true;
if (ws > 0) {
// 如果前置节点的状态为CANCELLED,表示线程中断或者等待超时
// 需要将队列中节点取消等待
do {
// 回溯找到第一个不为CANCELLED的节点并赋值给pred
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 即不是SIGNAL也不是CANCELLED状态
// 将前置节点p的状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 将当前线程阻塞并且返回该线程状态是否是中断
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
acquireQueued
方法在排他模式下会以自旋的方式且不可中断的获取锁;每一个新加入CLH队列的线程都会判断自己的前置节点是否是head节点,只有前置节点是head节点时才有资格去尝试获取锁资源,否则将会把自己阻塞;
// 另一种加锁方式
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
// 如果线程中断 将抛出异常 因为中断的线程获取锁没有意义
throw new InterruptedException();
return tryAcquire(arg) || // tryAcquire会尝试获取锁一次 失败将进入到下面
doAcquireNanos(arg, nanosTimeout);
}
static final long spinForTimeoutThreshold = 1000L; // 1000纳秒
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
// 增加了超时判断
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果剩下的时间 > 1000 纳米
nanosTimeout > spinForTimeoutThreshold)
// 将短暂挂起
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryLock
方法无论在公平锁和非公平锁下都将采用非公平策略,调用该方法的线程会立即尝试加锁,如果加锁失败,会在doAcquireNanos
里面自旋等待锁资源,基本的排队和挂起逻辑和acquireQueued
类似,但是增加了超时的处理机制,不同的是该方法里面有超时逻辑的判断,且自旋的过程中是不允许线程中断的,否则会抛出InterruptedException
异常;
private void cancelAcquire(Node node) {
if (node == null)
return; // 忽略null节点
node.thread = null; // 释放线程
Node pred = node.prev;
// 将所有前置节点的waitStatus为CANCELLED的节点移除队列
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// pred节点指向的是队列中第一个CANCELLED节点的前置节点
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点且cas将pred节点设置为tail节点成功
if (node == tail && compareAndSetTail(node, pred)) {
// 已经将当前节点移除队列
// cas将pred节点的next设为null
compareAndSetNext(pred, predNext, null);
} else {
// node节点不是尾节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 如果pred节点不是head 且
// pred节点的状态为SIGNAL,意味着pred节点准备释放资源
Node next = node.next;
// 如果当前节点的next节点的状态不为CANCELLED状态
// 将p节点的下一个节点设置为next
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 若当前节点是head节点 或者
// pred节点的状态不为SIGNAL
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
// 这里的node节点即为head节点
int ws = node.waitStatus;
if (ws < 0)
// 尝试恢复状态
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 获取head的后继节点 即队列中第一个节点
if (s == null || s.waitStatus > 0) {
// 如果s节点不为null 但是s节点的状态为CANCELLED,需要将其释放
s = null;
// //从tail结点向前遍历,找到离head最近不为null的节点并且waitStatus不为CANCELLED
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 如果找到节点s,则唤醒该结点的线程
LockSupport.unpark(s.thread);
}
当doAcquireNanos
中获取锁资源超时了,就会进入到cancelAcquire
中取消正在进行的获取操作,首先会找到CLH队列中第一个状态为CANCELLED
的节点的前置节点p,这里分两个情况,
- 如果p节点不是head节点,意味着排队等待的线程大于2(当前线程+其他线程),若p的状态为
SIGNAL
,且当前线程节点n的下一个节点状态不为CANCELLED
,会将n的下一个节点衔接到p的下一个节点上去(将n移除队列),并释放掉当前线程节点; - 如果p节点是head节点,意味着当前排队线程是第一个排队获取锁的节点那么再释放掉当前线程节点前还会去尝试寻找下一个可以用来唤醒的线程,如果这个节点存在,调用unpark()方法唤醒线程,这里的下一个线程不一定是当前节点的next节点;
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg)) // 若获取锁失败
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); // 加入等待队列
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 获取锁成功
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 将线程阻塞后遇到线程中断情况将抛出线程中断异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
lockInterruptibly
和lock
唯一的区别就是对待线程中断的处理方式,lock
方法遇到线程中断选择的是生吞中断事件,继续执行阻塞加锁操作,等到获取到锁的时候才会返回线程是否中断,而lockInterruptibly
遇到中断事件时会马上抛出中断异常然后退出获取锁资源;
公平锁
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 当前锁没有被占用
// 公平锁和非公平锁最大的区别在这
if (!hasQueuedPredecessors() && // 没有其他线程排队等待
compareAndSetState(0, acquires)) { // cas加锁成功
setExclusiveOwnerThread(current); // 设置当前线程为锁的拥有者
return true;
}
}
// 如果当前线程是锁的拥有者
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//允许重入锁
setState(nextc);
return true;
}
return false;
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
公平锁在加锁过程中和非公平锁的区别是,公平锁在尝试加锁时会判断当前锁是否被占用以及排队队列前是否有其他先到的排队线程,非公平锁则允许每个线程都有一次插队的机会;
public void unlock() { sync.release(1); }
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 释放锁成功
Node h = head;
if (h != null && h.waitStatus != 0)
// 尝试唤起下一个继任者
unparkSuccessor(h);
return true;
}
return false;
}
// 返回true表示锁资源被释放 否则表示锁还是被占用
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 当前线程不是获取锁的线程 抛异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
释放锁的操作比较简单,主要是处理state状态,如果当前线程已经不再占用任何资源,会尝试唤起下一个继任者;
总结
总结下ReentrantLock
的加锁,ReentrantLock
分为公平锁和非公平锁,默认的构造器使用的是非公平锁,非公平锁不保证任何特定的访问顺序,可以允许线程插队获取锁,而公平锁是按照FIFO的顺序排队等待锁,所以在通常情况下非公平锁的效率是要比公平锁高,公平锁也可能出现线程饿死的情况,即某一个线程持续获取资源而导致其他线程一直被阻塞,相比于synchronized
,ReentrantLock
提供了一些高级的功能,比如
- 等待锁中断
指如果当前持有锁线程长期不释放锁资源,正在等待的线程可以选择放弃等待,可中断特性对于处理执行时间长的同步块很有帮助 - 公平锁实现
synchronized
默认是非公平锁,公平锁可以保证多线程下申请锁是按照时间先后的顺序来的 - 实现锁绑定多个条件
ReentrantLock
可以绑定多个Condition
对象实现条件的分离和关联,而synchronized
不行;
内部通过CLH队列维持各个线程的等待关系,同步器内的state状态表示的是目前占用锁的资源数;
共享模式&CountDownLatch
CountDownLatch
是一个同步辅助类,它允许一个或多个线程一直等待直到其他线程执行完毕才开始执行。其中构造器的count表示需要等待执行的线程数;
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) { setState(count); }
int getCount() { return getState(); }
...
}
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
在CountDownLatch
中AQS的state的作用是表示需要等待执行完的线程个数;
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// state此时大于0 有剩余资源还在等待
doAcquireSharedInterruptibly(arg);
}
// 当资源为0时表示没有任何资源可以被获取
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 所有即将被阻塞的线程都会添加到队列里面
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 第一个调用await的会走到这里
int r = tryAcquireShared(arg);
// r=-1时表示state不等于0,反之r=1时state等于0
if (r >= 0) {
// state为0时首节点的线程会被唤醒 然后执行下面的方法
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 将自己阻塞等待其他线程唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 支持中断响应
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
因为初始化时定义了state代表可共享锁资源的数量,所以在state不为0时执行函数await
的线程都会被阻塞,直到state为0;
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 当所有占用的资源都被释放 唤醒等待队列的线程
doReleaseShared();
return true;
}
return false;
}
// 只有在state为0时才返回true
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
// 通过cas改变state
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// 当state为0时进入到这个方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
// CLH队列中有剩余的节点 即存在有被阻塞的线程
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 保证线程安全的关键
continue;
// 唤醒第一个等待节点的线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break; // 一次只释放一个线程
}
}
private void unparkSuccessor(Node node) {
// 这时候进来的node节点waitStatus都是0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 获得s的后继结点,这里即head的后继结点也就是第一个使用await的线程
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 恢复被阻塞线程
LockSupport.unpark(s.thread);
}
// 等到被阻塞线程被唤醒后 会继续执行方法剩余的部分 并退出函数
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// r=-1时表示state不等于0,反之r=1时state等于0
if (r >= 0) {
// state为0时首节点的线程会被唤醒 然后执行下面的方法
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 如果node节点的waitStatus为0,则将其设置为SIGNAL, 并返回false
// 如果node节点的waitStatus为SIGNAL,则返回true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
// node为head的next节点,propagate为1
Node h = head;
setHead(node); // 将node的thread引用释放
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取node节点的下一个节点nextNode
Node s = node.next;
// 如果nextNode是共享模式或者nextNode是最后一个节点
if (s == null || s.isShared())
// 循环唤醒剩余节点
doReleaseShared();
}
}
countDown
方法通过修改state的值来释放资源,当全部资源释放掉时(state == 0)会唤醒队列中所有被阻塞的线程,注意这里的唤醒操作,所有的唤醒操作都是从队列的第一个节点开始唤醒,假如有多个线程同时进入到doReleaseShared
释放也只会有一个线程进行唤醒,这里是通过cas设置节点的waitStatus
的状态来保证的线程安全的;
总结
CountDownLatch
代码量较少所以总体而言还是比较简单的,到这里已经大概了解了共享锁和排他锁的现实方式,可以简单对比一下两者的区别;
独占锁顾名思义就是同时只能一个线程能够获取到锁,其余的线程只能排队等待锁资源,在AQS中的CLH队列只有第一个节点是有权限获取到锁的,其他的节点只能被阻塞等待前置节点的唤醒才能加锁,其中独占锁分为公平锁和非公平锁,公平锁就是获取锁的顺序严格按照FIFO的顺序,而非公平锁则不一定是按照FIFO,允许其他线程有抢占资源的能力,通常而言非公平锁的效率较高;
共享锁就是允许多个线程都能获取到锁资源,在AQS的CLH队列中所有节点都是已经获取到锁的线程,在CountDownLatch
中共享锁的体现是可以允许多个线程同时入队被阻塞等待,等资源全部释放后再从队列的第一个线程开始逐个唤醒;
Semaphore
信号量机制的Java实现版本,如果说大学时学过操作系统这门课程的同学应该都会有印象,Semaphore通常用于限制能够访问某些(物理或逻辑)资源的线程数,内部维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,每一个线程获取许可证就调用acquire方法且会被阻塞,直到获得许可,release方法用来恢复许可并可能释放一个潜在被阻塞的线程;Semaphore也是分为公平锁和非公平锁两个版本。
非公平锁
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 如果可用的资源数小于0 将其阻塞
doAcquireSharedInterruptibly(arg);
// 否则直接当作加锁成功
}
// tryAcquireShared 实际上走到这里
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取剩下可用的资源数
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
// 若可用的资源数小于0 直接返回
// 否则通过cas更新占用的资源数(state - 1)
return remaining;
}
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 所有即将被阻塞的线程都会添加到队列里面
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 如果可用资源数大于0(r) 则唤醒等待的线程
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 将自己阻塞等待其他线程唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 支持中断响应
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public void release() { sync.releaseShared(1); }
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
非公平锁的加锁流程比较简单,在有剩余资源的情况下只需要一次简单的cas操作就可以获取到锁资源,当剩余资源等于0时才会将线程阻塞,
释放资源时也是通过cas更新state状态,在doReleaseShared
中如果有等待的线程会将等待线程唤醒,流程和上面的CountDownLatch
基本一致;
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果等待队列中有线程等待,则将该线程直接加入队列,不会尝试获取锁资源
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平锁与非公平锁的主要区别在于公平锁发现等待队列中有其他线程等待资源后会自动添加到队列末尾并将自己阻塞,少了抢占锁资源这一步;