写在前面
源码版本:jdk1.8
AQS
阅读建议:建议先大致过一遍本篇内容,然后实际跟着源码debug,自己去理解,不懂的可回过头来看本篇文章,加深理解。
手撸同步
- 锁的基本实现条件只要有表示锁
状态
的字段,CAS操作
,加锁
,解锁
方法就足够了 - 但单单只有这些方法并不能很好的解决问题,比如当
加锁
时,拿不到锁怎么办。 - 自旋?让出CPU等待唤醒?
- 如果是要等待唤醒是不是需要一个地方去暂存等待的线程呢? 如下简单实现
// 锁状态
volatile int status = 0;
// 排队队列
Queue parkQueue;
boolean compareAndSet(int except, int newValue){
// CAS操作,比对预期值,成功则将status设置为新值
}
void lock(){
while(!compareAndSet(0, 1)){
//
park();
}
}
void park(){
// 将当前的线程加入到Queue
parkQueue.add(Thread.currentThread());
// 释放CPU,挂起
releaseCPU();
}
// 下面两步会不会有原子性问题,待看源码解决?
void unlock(){
// 唤醒
if(!compareAndSet(1, 0) ){
// 解锁失败,抛异常
}
//得到要唤醒的线程头部线程,唤醒等待线程
unpark(parkQueue.header(););
}
所谓AQS,就是对维护上述队列,也是AQS核心思想
下面依次解析 ReentrantLock
的上锁、解锁过程
主要属性
ReentrantLock
// 内部用组合的方式,维护了实现 AbstractQueuedSynchronizer 的Sync 静态类
// 其中公平类和非公平类继承自这个同步内部类
private final Sync sync;
// 执行加锁
public void lock() {
sync.lock();
}
static final class NonfairSync extends Sync{...}
static final class FairSync extends Sync{...}
AbstractQueuedSynchronizer
// 重要属性
// 等待队列队头
private transient volatile Node head;
// 队尾
private transient volatile Node tail;
// 加锁状态,可 > 1 则说明多次重入
private volatile int state;
// 内部等待队列
static final class Node {
// 当前node的等待状态
volatile int waitStatus;
// 上一个node
volatile Node prev;
// 下一个node
volatile Node next;
// 当前的线程,封装进node
volatile Thread thread;
}
附上一张AQS队列图,帮助更好理解:
比如执行一段代码,我们来看看流程。先看公平锁
final ReentrantLock lock = new ReentrantLock(true);
new Thread(() -> {
reentrantLock.lock();
// do something
reentrantLock.unlock();
}).start();
公平锁加锁流程
下面通过序号来逐步解释 注意 (1)、(2)xxx等
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
(1) 调用AbstractQueuedSynchronizer的获取锁方法,如果加锁成功,则更新为改变的值
接下来是AQS类中的(2)
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
(3) 回到调用者子类的tryAcquire,去尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
(4)获取锁状态
int c = getState();
判断锁是否是空闲的 (没有加过锁和其他骚操作)
if (c == 0) {
进入到(5)先判断是否需要排队
if (!hasQueuedPredecessors() &&
不需要排队则进行CAS加锁
compareAndSetState(0, acquires)) {
设置当前线程为锁的拥有者(下次再次重入的时候直接判断是否是同一线程就可, 看下面第五行)
setExclusiveOwnerThread(current);
return true;
}
}
如果当前线程是重入线程
else if (current == getExclusiveOwnerThread()) {
把锁的状态 ++,然后设置回去
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
如果锁不空闲,而且不是重入那么就加锁失败,返回(2),入队
return false;
}
}
//
public abstract class AbstractQueuedSynchronizer extends xxx {
···
public final void acquire(int arg) {
if ((2)首先尝试获取锁,回到调用者子类的tryAcquire中
!tryAcquire(arg) &&
如果加锁失败则会将当前线程封装成node加入队列去排队
接着下面的(6)先去增加一个尾部等待节点
(7)然后再将该节点park掉
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//加入队列,线程就会park,等到解锁会unpark,醒来之后判断自己是否被打断了,如果被打断了则执行selfInterrupt方法
//执行Thread.currentThread().interrupt(),此处是因为后面 需要parkAndCheckInterrupt() 时检查 是否中断,响应中断用到的。主要是lockInterruptibly响应线程中断会使用到。
selfInterrupt();
}
···
(5)判断是否需要排队
public final boolean hasQueuedPredecessors() {
// 当前队头,尾
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return
如果h == t 可能是队列没有初始化,不需要竞争 或者 是如果当前只有一个线程在是使用锁,这个线程重入了,没有其他线程竞争,则返回表明不需要排队
因为队头等于队尾的话说明排队的node 没有 或 只有一个
h != t &&
头节点下一个是不是空,即 判断队列里面是否只有一个node在等待
1. 不为空,则是有多个线程在排队,则判断 头节点下一个是不是当前线程,是那就不需要排队了,返回然后尝试 CAS 操作。如果不是当前线程,就需要排队了。
2. 为空,则不用等待,去尝试获取锁
((s = h.next) == null || s.thread != Thread.currentThread());
}
···
}
接着上面,如果加锁失败,需要先创建一个当前线程的node,作为队列尾节点
AbstractQueuedSynchronizer
(6)添加尾节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
如果当前尾节点为空,即判断当前等待队列是否已经初始化
if (pred != null) {
初始化成功则设置将当前节点上一个置为尾节点,尾节点下一个为 当前的新节点
node.prev = pred;
CAS设置 尾指针为当前节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
如果当前队列没有初始化,则初始化,并将node置为尾节点
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
第一次循环:肯定是null,因为没有初始化队列,所以初始化一个空node
if (t == null) { // Must initialize
设置头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
第二次循环,将node替换为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
入队完成后
(7)去park 封装当前线程的node
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
拿到前一个node
final Node p = node.predecessor();
先看看如果前一个是头,可能他在我这段过程中已经释放锁了,所以尝试一下获取锁,获取到了就将当前node升级为head
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
判断前一个节点是否需要park,为什么是前一个下面(8)解释
第一次循环:判断是否需要挂起,在shouldParkAfterFailedAcquire中将前一个节点置为 需要挂起的状态,继续循环
第二次循环:shouldParkAfterFailedAcquire中 状态为 上次的 需要挂起的状态 直接返回true, 执行挂起并判断是否中断操作。
至此当前线程就进入了等待,加锁过程结束!!!
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
(8)判断前一个节点是否需要park,因为如果我当前的node被park了,已经把CPU给让出来了,没办法自己 修改状态,所以只能下一个node给上一个node 加等待停止状态
node的waitStatus 如果是 0 则说明当前node是 自由初始状态。如果是 -1 即 SIGNAL 说明是 等待停止 状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
上一个node的状态,如果是 SIGNAL 就是说明需要将他挂起,返回true
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
如果大于0,只有一个状态 CANCELLED 说明node被cancelled了,就往前遍历到一个没有取消的node,调整原node结构,丢弃取消掉的那些node
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
如果不需要挂起,而且没有取消,则将前一个节点置为需要挂起的状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
不需要挂起,返回false
return false;
}
其实非公平锁和公平锁只有一点点差异
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
只有这儿一点差异,非公平锁则会直接先去获取锁,获取不到再去走公平锁获取锁的流程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
还有就是tryAcquire有一点小差异
abstract static class Sync extends AbstractQueuedSynchronizer {
···
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
这里少了个 !hasQueuedPredecessors() && ,判断是否需要排队
这儿直接去比较,然后下面流程都是一致的了
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
···
- 总结一下,差异就在非公平锁 会先直接去 CAS 尝试获取锁
最后,写在加锁流程末尾,大家结合那张图,跟着逻辑一步步去形象化流程将会加快理解。
解锁流程 , 到这儿就容易多了,不区分公平、非公平
ReentrantLock
public void unlock() {
(1)释放锁
sync.release(1);
}
(3)尝试释放锁
protected final boolean tryRelease(int releases) {
c为 当前是否当前线程 有重入
int c = getState() - releases;
如果不是当前线程来自己解锁则抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
如果为0,则就是自由状态了,清空当前拥有锁的线程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
设置状态
setState(c);
return free;
}
AbstractQueuedSynchronizer
public final boolean release(int arg) {
(2)尝试释放锁
if (tryRelease(arg)) {
Node h = head;
(4)如果释放成功,并且如果队列已经初始化了(即 有线程在等待),就把头节点的下一个 unpark(如果waitStatus是 <0 则其线程是被阻塞了需要唤醒,>0的话为取消了)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
(5)解除排队
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
如果 状态比如需要park的,把他状态取消置为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
从后往前遍历找到 离node最近的 t.waitStatus <= 0 的节点
这里为什么要从后往前遍历:因为在某些情况下,会存在next节点断掉,不能从前向后遍历。
比如在加锁增加节点(addWriter)时,有这样的代码:node.prev = pred; compareAndSetTail(pred, node) 这儿是Tail入队的原子操作,但是此时pred.next = node;还没执行,如果这个时候执行了unparkSuccessor方法,不知道pred的下一个是谁,这儿就断掉了。所以必须从后往前找。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
上述找到节点后将它unpark
if (s != null)
LockSupport.unpark(s.thread);
}