简单介绍
AQS (抽象队列同步器), AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch, 这是一个基于模板方法来做的, 我们只需要实现模板方法中某些可以实现的方法, 就可以来实现一个自定义的lock.
本文大部分参考下()面这篇博客
参考博客: https://www.cnblogs.com/waterystone/p/4920797.html
框架
AQS中维护了一个 volatile修饰的int 变量 state (代表共享变量) 和一个FIFO线程等待队列(多线程竞争资源被阻塞时候会进入这个队列)
注意: 是state和 等待队列(经常把等待队列和同步队列)
state的三种方法:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式: Exclusive (独占, 只有一个线程能够执行) 和share (共享, 多个线程尅同时执行)
不同的自定义同步器获取state的方式也不一样. 自定义同步器在时间的时候只需要实现state的获取和释放的方法即可(基于模板方法), 在实现的过程中, 也可以利用AQS中实现的其他方法
自定义同步器实现的主要自定义实现的方法:
- isHeldExclusivity(): 该线程是否正咋独占资源, 只有用到condition的时候才去实现
- tryAcquire(int) :独占方式, 尝试获取资源, 成功返回true, 失败则返回false
- tryRelase(int): 独占方式 尝试释放资源, 成功返回true, 失败则返回false
- tryAcquireShare(int) 共享方式, 尝试获取资源, 负数表示失败, 0 代表成功,但是没有剩余可用资源, 正数表示成功, 还有剩余资源
- tryRelaseShared(int) 共享方式, 如果释放后允许唤醒后续等待结点返回true,否则返回false。(感觉上面的tryRelase方法应该也有这种唤醒)
以ReentrantLock为例, state初始化为0, 表示未锁定的状态 ,A 线程lock()时, 会调用tryAcquire()独占该锁并将state+1, 此后, 其他线程再tryAcquire()就会失败, 直到A线程ublock()到state = 0(即释放锁)为止, 其他线程才有机会获取该锁, 当然释放锁之前, a线程自己是可以重复获取此锁的(state会累加), 这就是可重入的概念.但要注意,获取多少次就要释放多少次, 这样子才能保证state是能回到0.
再以countdownlatch为例子, 任务分为n个子线程去执行, state也初始化为n (注意n要与线程的个数一致). 这n个子线程是并行(并发???)执行, 每个子线程执行完之后countDown()一次, state会Cas减一. 等到所有的自检测都执行完之后(state = 0), 会unpark()主调用线程, 然后主调用线程会从await()函数返回, 继续后余动作.
一般来说, 自定义同步器要么是独占方法, 要么是共享方式, 他们也只需要实现tryAcquire-tryRelase, tryAcquireShared-tryReleaseShared中的一种即可.但是AQS也支持自定义同步器实现独占和共享两种方式.
源码讲解
开始讲解AQS源码实现, 依次按照acquire-release, acquireShared-releaseShared次序来
1 节点状态waitStatus
先说一下node, node节点是对每一个等待获取资源的线程封装,它包含了需要同步的线程本身以及其等待状态, 比如是否被阻塞, 是否等待唤醒, 是否已经被取消等. 变量waitStatus则表示当前的节点状态
CANCELLED(1) :
表示当前节点已经取消调度, 当timeout 或被中断(响应中断的情况下), 会触发变更为此状态,进入该状态后的结点将不会再变化。
SIGNAL(-1):
表示后继节点在等待当前节点唤醒. 后继节点入队时, 。后继结点入队时,会将前继结点的状态更新为SIGNAL
Condition(-2): 表示节点等待在Condition上, 当其他线程调用了Condition的signal()方法后, CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE(-3): 共享模式下, 前继节点不仅会唤醒其后继节点, 同时也可能会唤醒后继的后继节点
0
新节点入队时的默认状态
注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。
2 acquire(int )
这个方法是在独占模式下线程获取共享资源的顶层入口. 如果获取到资源, 线程就直接返回, 否则进入等待队列, 直到获取资源为止, 并且整个过程忽略中断影响, 这也正是lock()的语义, 当然不仅仅局限于lock(). 获取到资源后,线程就可以去执行临界区代码
public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
函数流程如下:
- tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
- addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
2.1 tryAcquire(int)
此方法其实就是我们需要实现的模板方法, 它尝试去获取独占资源,如果获取成功,则返回true, 否则返回false.
1 protected boolean tryAcquire(int arg) {
2 throw new UnsupportedOperationException();
3 }
这边怎么实现就看我们自己了, 这边为啥不用abstract方法, 是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。
2.2 addWaiter(Node)
这个方法是将当前线程加入到等待队列的队尾, 并返回当前线程所在的节点
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队。
enq(node);
return node;
}
private Node enq(final Node node) {
//CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2.3 acquireQueued(Node, int)
当通过tryAcquire() 尝试获取节点失败, 就会调用addWaiter()将节点放入队列尾部了. 那么之后应该干啥: 进入等待状态休息, 直到其他线程彻底释放资源后唤醒专辑, 自己拿到资源后, 就可以去干该干的事情了. acquireQueued()就是干这件事的.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否成功拿到资源
try {
boolean interrupted = false;//标记等待过程中是否被中断过
//又是一个“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驱
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
failed = false; // 成功获取资源
return interrupted;//返回等待过程中是否被中断过
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}
节点在加入队列后, 会进入一个死循环
- 首先会获取前继节点
- 如果前继节点是头结点, 那么则去尝试获取资源, 如果拿到资源, 则会将自己设置为头结点
- 如果没有获取到资源就通过(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 进入waiting状态, 直到unpark.如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
- 如果在等待的过程中没有成功获取资源(比如超时, 或者中断, 那么 就取消节点在队列中的等待)
2.3.1shouldParkAfterFailedAcquire(Node, Node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
if (ws == Node.SIGNAL)
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
注意这个方法返回的是一个boolean值, 其实是判断当前节点是否可以休息(sleep), 休息的套件是前驱节点是节点状态是SIGNAL(忘记的可以看上面, 表示后继节点在等待唤醒), 注意如果前驱节点放弃了, waitStatus大于0, 那么就会一直往前面找,找到合适的节点
2.3.2 parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park()使线程进入waiting状态
return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}
当前继节点满足后, 该线程就可以去休息了, 真正进入等待状态 park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt().
acquire()方法流程总结
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
-
如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
image.png
3 release(int)
这个方法是独占模式下线程释放共享资源的顶层入口. 他会释放指定量的资源, 如果彻底释放了(也就是state等于0), 他会唤醒等待队列里的其他线程来获取资源.
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
他的逻辑并不复杂, 它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!!
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
这个方法也是等待我们去实现
跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
3.1 unparkSuccessor(Node)
private void unparkSuccessor(Node node) {
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
这个函数的作用就是唤醒等待队列中的下一个线程, 首先是取当前节点的后继节点,但是如果后继节点不存在或者 后继节点已经放弃了, 那么 就会从队列的未节点找, 一直找到最前面那个节点, 然后唤醒找出来的节点
release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
4 acquireShared(int)
1 public final void acquireShared(int arg) {
2 if (tryAcquireShared(arg) < 0)
3 doAcquireShared(arg);
4 }
这个方法是共享模式下线程获取共享资源的顶层入口. 它会获取指定量的资源, 获取失败则进入等待队列, 直到获取到资源为止, 整个过程忽略中断
代码中的tryAcquiredShared(arg) 则需要我们自定义的同步器去实现. 不过AQS已经把返回值的语义定好了: 负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:
- tryAcquireShared()尝试获取资源,成功则直接返回;
- 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
4.1 doAcquireShared(int)
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入队列尾部
boolean failed = true;//是否成功标志
try {
boolean interrupted = false;//等待过程中是否被中断过的标志
for (;;) {
final Node p = node.predecessor();//前驱
if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
int r = tryAcquireShared(arg);//尝试获取资源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
p.next = null; // help GC
if (interrupted)//如果等待过程中被打断过,此时将中断补上。
selfInterrupt();
failed = false;
return;
}
}
//判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
有木有觉得跟acquireQueued()很相似?对,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样,不知道Doug Lea是怎么想的。
跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。
4.2 setHeadAndPropagate(Node, int)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//head指向自己
//如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!
(我现在有些乱, 独占模式会在relase的情况下去唤醒后继的节点, 共享的时候会在获取到的时候就去发现还有剩余资源, 也会去唤醒后继几点), 至于怎么唤醒在方法
doReleaseShared()中
3.4 releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//尝试释放资源
doReleaseShared();//唤醒后继结点
return true;
}
return false;
}
此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
这边的步骤基本上就是: 释放掉资源后, 唤醒后继, 跟独占模式下的release()相似, 但是指的注意的是, 独占模式下tryRelease()在完全释放掉资源后(state = 0), 才会返回true去唤醒其他线程. 这主要是局域独占下面的可重入考虑的, 而共享模式下releaseShared()则没有这种要求, 共享模式下releaseShared(0则是没有这种要求的, 共享模式的实质就是控制一定量的线程并发执行, 那么拥有资源的线程在释放掉部分资源时候就可以唤醒后继节点.
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//唤醒后继
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)// head发生变化
break;
}
}
本节我们详解了独占和共享两种模式下获取-释放资源(acquire-release、acquireShared-releaseShared)的源码,相信大家都有一定认识了。值得注意的是,acquire()和acquireShared()两种方法下,线程在等待队列中都是忽略中断的。AQS也支持响应中断的,acquireInterruptibly()/acquireSharedInterruptibly()即是,相应的源码跟acquire()和acquireShared()差不多,这里就不再详解了。
简单应用
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
Mutex是一个不可重入的互斥锁实现。锁资源(AQS里的state)只有两种状态:0表示未锁定,1表示锁定。下边是Mutex的核心源码:
class Mutex implements Lock, java.io.Serializable {
// 自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 判断是否锁定状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 尝试获取资源,立即返回。成功则返回true,否则false。
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 这里限定只能为1个量
if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
return true;
}
return false;
}
// 尝试释放资源,立即返回。成功则为true,否则false。
protected boolean tryRelease(int releases) {
assert releases == 1; // 限定为1个量
if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);//释放资源,放弃占有状态
return true;
}
}
// 真正同步类的实现都依赖继承于AQS的自定义同步器!
private final Sync sync = new Sync();
//lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
public void lock() {
sync.acquire(1);
}
//tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
public boolean tryLock() {
return sync.tryAcquire(1);
}
//unlock<-->release。两者语文一样:释放资源。
public void unlock() {
sync.release(1);
}
//锁是否占有状态
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
同步类在实现时一般都将自定义同步器(sync)定义为内部类,供自己使用;而同步类自己(Mutex)则实现某个接口,对外服务。当然,接口的实现要直接依赖sync,它们在语义上也存在某种对应关系!!而sync只用实现资源state的获取-释放方式tryAcquire-tryRelelase,至于线程的排队、等待、唤醒等,上层的AQS都已经实现好了,我们不用关心。