1 AQS原理
全称是 AbstractQueuedSynchronizer ,阻塞式锁和相关同步工具的框架
1.1 特点
用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
getState 获取state状态
setState 设置state 状态
compareAndState cas机制设置state状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于FIFO的等待队列,类似于Monitor 的EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于Monitor 的WaitSet
子类主要实现这样的一些方法(默认抛出UnsupportedOperationException)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
获取锁的姿势
//如果获取锁失败
if (! tryAcquire(arg)) {
//入队,可以选择阻塞当前线程 park unpark 机制
}
释放锁的姿势
//如果释放锁成功
if (tryRelease(arg)) {
//让阻塞线程恢复运行
}
2. 自定义锁
自定义锁实现 Lock 接口,定义一个内部类继承 AbstractQueuedSynchronizer 类
自定义锁是一个不可重入锁
public class AqsTest {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(()->{
lock.lock();
try{
System.out.println("lock" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
System.out.println("unlock" + Thread.currentThread().getName());
lock.unlock();
}
}).start();
new Thread(()->{
lock.lock();
try{
System.out.println("lock" + Thread.currentThread().getName());
}finally {
System.out.println("unlock" + Thread.currentThread().getName());
lock.unlock();
}
}).start();
}
}
/**
* 自定义不可重入锁
*/
class MyLock implements Lock {
class MySync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)) {
//表示加上锁了,设置owner为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
//注意两个方法顺序
setExclusiveOwnerThread(null);
//state 使用volatile修饰,禁止指令重排
setState(0); //表示解锁
return true;
}
/**
* 是否持有独占锁
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition(){
return new ConditionObject();
}
}
private MySync sync = new MySync();
/**
* 不成功进入等待队列等待
*/
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 尝试加锁,只会尝试一次
* @return
*/
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
/**
* 创建条件变量
* @return
*/
@Override
public Condition newCondition() {
return null;
}
}
测试结果:
lockThread-0
unlockThread-0
lockThread-1
unlockThread-1
3. ReentrantLock 原理
ReentrantLock UML类图
3.1 非公平锁实现原理
3.1.1 加解锁流程
加锁流程
没有竞争时
第一个竞争出现时
Thread-1 执行了
- CAS 尝试将state 由 0 改为 1 ,结果失败
- 进入tryAcquire逻辑,这时 state 已经是 1 ,结果仍然失败
- 接下来进入addWaiter逻辑,构造Node队列
- 途中黄色三角表示该Node的waitStatus 状态,其中 0 为默认正常状态
- Node的创建时懒惰的
- 其中第一个Node成为 Dummy(哑元) 或哨兵,用来占位,并不关联线程
- 当前线程进入acquireQueued 逻辑
- acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入park 阻塞
- 如果自己是紧邻着 head (排第二位),那么再次tryAcquire 尝试获取锁,当然这时state 仍为1,失败
- 进入 shouldParkAfterFailedAcquire(p, node) 逻辑,将前驱node ,即head 的waitStatus 改为 -1 ,这次返回false (-1 表示有责任唤醒它的后继节点)
- shouldParkAfterFailedAcquire(p, node) 执行完毕后回到acquireQueued, 再次tryAcquire 尝试获得锁,当然这时的 state 仍为1,失败
- 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱node 的waitStatus 已经是 -1,这次返回true
- 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
- 再次有多个线程经历上述过程竞争失败,变成这个样子
解锁流程
1.Thread-0 释放锁,进入tryRelease 流程,如果成功
- 设置 execlusiveOwnerThread 为null
- state = 0
2.当前队列不为null , 并且 head 的waitStatus = -1,进入unparkSuccessor 流程
- 找到队列中离head 最近的一个Node (没取消的),unpark 恢复其运行,本例中为Thread-1
3.回到Thread-1 的acquireQueue流程
如果加锁成功(没有竞争),会设置
- exclusiveOwnerThread 为Thread-1 ,state = 1;
- head 指向刚刚Thread-1 所在的Node,该Node清空Thread
- 原本的head因为从链表断开,而可被垃圾回收
4.如果这时候有其他线程来竞争(非公平的体现),例如这时Thread-4 来了
如果不巧被Thread-4占了先机
- Thread-4 被设置为exclusiveOwnerThread , state = 1
- Thread-1 再次进入acquireQueued 流程,获取锁失败,重新进入park 阻塞
源码
先从构造器看,默认为非公平锁实现
public ReentrantLock() {
sync = new NonfairSync();
}
NonfairSync 继承自 Sync , Sync 继承自AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //获取前驱节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//创建一个节点,waitStatus = null
Node node = new Node(Thread.currentThread(), mode);
//如果当前队列已经有节点了,直接将该节点加入到AQS队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果当前队列为空,要先初始化,new Node()
enq(node);
return node;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
3.1.2 可重入原理
具体分析见源码
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//当前线程没有获得锁,尝试修改状态0->1
if (compareAndSetState(0, acquires)) {
//修改状态成功,表示获得锁。设置owner为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
//如果已经获得了锁,线程还是当前线程,表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//设置当前的state, 同一个线程锁重入state++
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//c==0时重新设置 owner 返回true,否则只更新state值,返回false
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//是否独占锁
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
...
}
3.1.3 可打断原理
不可打断模式
在此模式下,即使它被打断,仍会驻留在AQS 队列中,等获得锁后方能继续运行(是继续运行,只是打断标记被设置为true)
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是true,则park 会失效
LockSupport.park(this);
// interrupted 会清除打断标记
return Thread.interrupted();
}
//未获得到锁时,调用该方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
//还是需要获得锁后,才能返回打断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果是因为interrupt 被唤醒,返回打断状态为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果打断状态为true
selfInterrupt();
}
static void selfInterrupt() {
// 重新产生一次中断
Thread.currentThread().interrupt();
}
可打断模式
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 如果没有获得锁,进入
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//在park过程中如果被interrupt 会进入此
// 这时抛出异常,而不会再次进入 for(;;)
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4. 公平锁实现原理
非公平锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果还没有获得锁
if (c == 0) {
//尝试用CAS获得锁,这里体现非公平性:不去检查AQS队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁,线程还是当前线程,表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取失败,回到调用处
return false;
}
公平锁实现
// 与非公平锁的主要区别在于tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查AQS队列中是否有前驱节点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
5. 条件变量的实现原理
每个条件变量其实就对应着一个等待队列,实现类是ConditionObject
await 流程
开始 Thread-0持有锁,调用await,进入 ConditionObject 的 addConditionWaiter 流程创建新的Node状态为-2(Node.CONDITION),关联 Thread-0,加入等待队列的尾部
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//调用ConditionObject.await() 先进入addConditionWaiter()流程
Node node = addConditionWaiter();
//接下来进入AQS的fullyRelease 流程,释放同步器上的锁
//为什么这里要调用 fullyRelease(node)? 有可能一个线程重入好几次
int savedState = fullyRelease(node);
int interruptMode = 0;
//进入一个循环,判断
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//给当前线程创建一个新节点,连接到链表上
//waitStatus == Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
当前线程添加到 ConditionObject队列中后,当前线程要释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//释放锁成功,返回当前状态;若释放失败,设置当前节点的状态为Node.CANCELLED
//release(savedState) 还会唤醒等待队列中的下一个节点
//unpark AQS队列的下一个节点,竞争锁,假设没有其他线程竞争,Thread-1竞争锁成功
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
signal 流程
假设Thread-1 要来唤醒 Thread-0
进入ConditionObject 的doSignal()流程
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
*/
public final void signal() {
//如果当前线程没有持有锁,调用signal()抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//取出等待队列的头结点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
执行 transferForSignal 流程,将该Node 加入 AQS队列尾部,将Thread-0 的waitStatus 改为 0;将Thread-3的 waitStatus 改为 -1
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
//将这个节点从 condition队列转移到 sync队列,如果成功则返回true
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//首先尝试cas操作将waitStatus 改为0,若失败,直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//拼接将该节点到 aqs 队尾。调用 enq()返回该节点的前置节点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true
}
Node节点定义
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}