AQS简介
AbstractQueuedSynchronizer简称AQS,它提供了一个FIFO双向队列,用来实现锁和其他的同步框架,AQS通过子类继承并实现它的方法来管理同步状态,如ReentrantLock、CountDownLatch底层都继承了AQS;
AQS独占锁的实现
AQS数据结构以及属性
AQS内部维护了一个FIFO双向队列,队列中的元素是一个Node类型的节点,Node节点元素如下:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
- SHARED:共享锁标记;
- EXCLUSIVE:独占锁标记;
- waitStatus:表示节点的状态、节点状态有:
- CANCELED:值为1,表示节点被取消;
- SIGNAL:值为-1,表示表示当前节点的后继节点已经或者将要被阻塞,当前节点释放后,后继节点需要被唤醒;
- CONDITION:表示节点在等待condition,在condition队列中;
- PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点(仅在共享模式下使用);
- 0:waitStatus的初始状态;
- prev:当前节点的前驱节点;
- next:当前节点的后继节点;
- nextWaiter:共享模式下存储在condition队列的后继节点;
- thread:当前线程;
AQS中维护了一个head节点、一个tail节点,表示队列的头节点和尾节点;AQS同时还维护了一个state变量,该变量主要表示加锁的状态;在ReentranLock中,由于ReentrantLock是一个可重入锁、所以每加一次锁后state的状态加1,释放一次锁后state状态减1,当state为0时表示锁已被释放掉;AQS还维护了headOffset、tailOffset等,用来记录头节点、尾节点、节点状态的内存偏移量,在cas操作时通过对应的offset偏移量用来确定head节点、tail节点等在内存中的位置,来替换相对应的变量;
AQS独占锁的代码实现
1、acquire
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
ReentrantLock的构造方法默认是使用的非公平锁、在NonfairSync非公平锁中、如果通过cas能获取锁,则调用setExclusiveOwnerThread设置exclusiveOwnerThread为当前线程,表示这个线程独占这个锁;否则就调用AQS的acquire方法;
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire方法主要工作:
- 1.尝试获取独占锁;
- 2.获取成功则返回、获取失败则调用addwatier把当前线程作为Node节点添加到队列尾部;
- 3.调用acquireQueued方法自旋、尝试获取锁直到成功,并返回中断标志位;
- 4.如果获取锁失败,并且中断标志位为true,然后设置线程中断;
2、tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire方法在AQS内部是没有实现的,需要由子类继承后去实现它的方法。这边为什么不用abstract来修饰?因为在AQS中有2种锁方式:共享锁和独占锁,如果用抽象方法、那子类就必须实现对应的功能,如果子类是共享锁,那它还得出现独占锁的方法,但这对子类来说并不是需要的,所以这边并没有用abstract来修饰。
以下是ReentrantLock中非公平锁对tryAcquire的实现:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3、addWaiter
private Node addWaiter(Node mode) {
//创建当前线程的节点
Node node = new Node(Thread.currentThread(), mode);
//获取AQS的尾节点
Node pred = tail;
//尾节点为空则进行enq
if (pred != null) {
//尾节点不为空,则把当前节点的前继节点指向队列的尾节点
node.prev = pred;
//通过cas重新把尾节点设置为当前节点,否则enq
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter方法主要是根据当前线程创建一个Node节点,并添加到队列的尾部。
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
在AQS初始化的阶段会调用static代码块获取tailOffset,即tail节点在内存地址的偏移量。compareAndSetTail通过cas操作、根据tailOffset获取内存中的tail节点、如果tail节点与预期的expect节点相同(except也就是tail节点、在并发下tail节点可能会一直改变),则把内存中的tail节点修改为update节点;compareAndSetHead、compareAndSetWaitStatus、compareAndSetNext等方法与之类似;
4、enq
private Node enq(final Node node) {
//不断执行for循环,直至成功
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//如果tail节点为空、则创建一个Node节点作为头节点和尾节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//tail节点不为空、则重新通过cas操作来设置当前节点为队尾、以及设置当前节点的前继节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法主要是不断的循环,是一种自旋操作,去初始化头尾节点、以及设置当前节点的前继节点与尾节点;因为在addWaiter方法中cas操作设置尾节点可能会失败,在并发情况下尾节点可能会不断的更改,所以enq方法自旋去不断的设置当前节点的前继节点和队列的尾节点,以保证节点能添加到队列尾部。
注意这边设置tail节点和设置当前节点并不是原子操作,所以在next节点的指针可能会出现问题,后面为保证遍历到每一个节点,需要从尾部向前递归。
5、acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中断标志位
boolean interrupted = false;
for (;;) {
//获取前置节点
final Node p = node.predecessor();
//如果前置节点是head,则尝试获取锁
if (p == head && tryAcquire(arg)) {
//设置head节点为当前节点
setHead(node);
//清除之前到head节点
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果当前节点的前继节点不是head或者获取锁失败
//则判断是否需要park阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued方法是自旋的获取锁直至成功,最后返回中断标志位。
6、shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//如果前继节点状态为CANCELED、则不断向前推找到不为CANCELED的节点
//把当前节点的前继节点指向不为CANCELED的节点,删除中间的CANCELED节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//节点状态为0或者为CONDITION时,则把前继节点状态改为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire方法主要是判断节点是否需要park阻塞,并删除节点前面的CANCELED节点,所以只有当前继节点为SIGNAL时,当前节点才需要阻塞。
7、parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.park方法:判断线程是否中断、如果线程不是中断的则阻塞当前线程,需要调用unPark方法进行唤醒;如果线程是中断的,则直接返回。
interrupted()方法的作用,该方法是获取线程的中断状态,并复位,也就是说,如果当前线程是中断状态,则第一次调用该方法获取的是true,第二次则是false。而isInterrupted()方法则只是返回线程的中断状态,不执行复位操作。
- 当线程为非中断时,执行park方法进行阻塞;
- 当线程为中断时,Thread.interrupted返回true,并重新复位中断标志位,再次循环后重新调用park方法进行阻塞;
这边为何需要阻塞呢,假设前面一个线程获取锁之后执行了很长时间,导致for循环一直在执行,如果有很多的线程都在执行,这样就导致cpu使用率飙升。所以这边需要用park进行阻塞,需要时调用unPark方法唤醒。
8、cancelAcquire
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
//去掉节点上的线程
node.thread = null;
Node pred = node.prev;
//删除队列中当前节点前面的CANCELED节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
//当前节点的状态置为CANCELED
node.waitStatus = Node.CANCELLED;
//如果当前节点为尾节点,则把当前节点的前继节点设为尾节点
if (node == tail && compareAndSetTail(node, pred)) {
//把尾节点的next节点置空
compareAndSetNext(pred, predNext, null);
} else {
//当前节点的前继节点不为头节点
//如果前继节点不为SIGNAL,则设置为SIGNAL
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//把前继节点的next节点设为当前节点的下个节点
//相当于删除当前节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果是head的后继节点、或者设置状态失效,则唤醒它的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
cancelAcquire 方法删除队列中的当前节点以及当前节点前面的CANCELED节点、如果当前节点是头节点,则唤醒当前节点的后继节点。
9、unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//尾节点向前遍历,找到最靠近当前节点的waitStatus<=0的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//如果节点不为空,则唤醒节点的线程
LockSupport.unpark(s.thread);
}
unparkSuccessor方法主要是唤醒最靠近当前节点,并且waitStatus<=0的节点;这边为何要从尾节点向前遍历到当前节点?因为在addWaiter方法、enq方法中
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
设置尾节点和设置前置节点的下一个节点不是原子操作,可能出现next节点没有指向,导致遍历时漏掉新进的尾节点;以及在cancelAcquire中node.next = node,遍历next节点出现环节点;所以next节点并不是安全的,所以相对于用next遍历,prev遍历更加安全。
AQS独占锁的释放
release
public void unlock() {
sync.release(1);
}
在ReentrantLock中unlock是调用AQS的release方法来释放锁。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
public final boolean release(int arg) {
//释放锁
if (tryRelease(arg)) {
Node h = head;
//释放锁后唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在AQS中tryRelease也和tryAcquire一样,需要子类去实现;
ReentrantLock中truRelease的实现:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
总结
AQS独占锁通过自旋、标志位、park、unPark以及队列来处理锁的状态,包括获取锁,释放锁;AQS通过大量的cas操作去实现更新操作,以保证状态、队列的原子性,当然cas只能保证一个属性的原子、并不能保证代码块的原子性;