基本实现原理
-
AQS
维持一个单一的状态信息 stateprivate volatile int state;
可以通过getState,setState,compareAndSetState函数进行操作。对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数;对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;对于Semaphore来说,state用来表示当前可用信号的个数;对于CountDownLatch来说,state用来表示计数器当前的值...
AQS
通过内置的FIFO双向队列来完成获取资源线程的排队工作。-
AQS
支持3种同步方式:- 独占式:如 ReentrantLock
- 共享式:如 Semaphore,CountDownLatch
- 组合式:如 ReentrantReadWriteLock
-
使用方法
对于使用者来说,我们无需关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂实现,这些在AQS中已经实现好了。我们需要做的就是对状态值state进行操作。根据需求重写相应方法即可。
- 独占方式下,我们需要在具体子类里实现
tryAcquire
和tryRelease
方法,子类在实现时要根据具体场景使用CAS算法尝试修改state状态值,成功返回 true,失败返回 false。当然,必须首先定义 state 状态值代表什么含义。 - 同理,共享方式下,我们需要实现
tryAcquireShared
和tryReleaseShared
方法。 - 除了上述方法外,还需要重写
isHeldEExclusively()
方法,来判断锁是被当前线程独占还是共享。
- 独占方式下,我们需要在具体子类里实现
基于AQS实现自定义同步器
我们基于AQS实现一个不可重入的独占锁,正如前面所讲,自定义AQS需要重写一系列函数,还需要定义原子变量state的含义。在这里我们定义,state为0表示目前锁没有被线程持有,为1表示锁已经被某一个线程持有。由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。
public class NonReentrantLock implements Lock, java.io.Serializable {
// 内部帮助类
private static class Sync extends AbstractQueuedSynchronizer {
// 判断锁是否已经被持有
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 获取锁
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1;
// CAS 操作
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁
@Override
protected boolean tryRelease(int arg) {
assert arg == 1;
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 提供条件变量接口
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.tryAcquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.tryRelease(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
验证自定义锁
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
static int value1 = 0;
static int value2 = 0;
static NonReentrantLock lock = new NonReentrantLock();
static CountDownLatch latch = new CountDownLatch(100);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
increaseWithoutLock();
increaseWithLock();
latch.countDown();
}
});
}
executorService.shutdown();
latch.await();
System.out.println(value1);
System.out.println(value2);
}
public static void increaseWithoutLock() {
for (int i = 0; i < 1000; i++) {
value1 ++;
}
}
public static void increaseWithLock() {
try {
lock.lock();
for (int i = 0; i < 1000; i++) {
value2 ++;
}
} finally {
lock.unlock();
}
}
}
源码分析
AQS维护一个FIFO双向队列,该队列元素类型为Node,AQS维护两个引用,分别指向队列头部head和尾部tail。
Node结点
static final class Node {
// 标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的
static final Node SHARED = new Node();
// 标记该线程是获取独占资源时被阻塞挂起后放入AQS队列的
static final Node EXCLUSIVE = null;
// waitStatus值,线程已取消
static final int CANCELLED = 1;
// waitStatus值,线程需要被唤醒
static final int SIGNAL = -1;
// waitStatus值,线程在条件队列中等待
static final int CONDITION = -2;
// waitStatus值,释放共享资源时需要通知其他结点
static final int PROPAGATE = -3;
// 记录当前状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 存放进入队列里面的线程
volatile Thread thread;
Node nextWaiter;
/* ... */
}
独占方式下,获取与释放资源流程
-
当一个线程调用
acquire(int arg)
方法获取独占资源时,会首先使用tryAcquire
方法尝试获取资源,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVE的结点插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己。public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
-
当一个线程调用
release(int arg)
方法时会尝试使用tryRelease
方法释放资源,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程thread。被激活的线程则使用tryAcquire
`尝试,成功则线程继续向下执行,否则还是会被放入队列中被挂起。public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
注意:
tryAcquire
和tryRelease
是需要具体子类来实现的,上文已经提过。以ReentrantLock为例,重写 tryAcquire 时,需要使用CAS算法查看当前state是否为0,为0则使用CAS设置为1,并设置当前锁持有者为当前线程,而后返回true,如果CAS失败返回false。
共享方式下,获取与释放资源流程
-
当一个线程调用
acquireShared(int arg)
方法获取共享资源时,会首先使用tryAcquireShared
方法尝试获取资源,成功则直接返回,失败则将当前线程封装为类型为Node.SHARED的结点插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己。public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
当一个线程调用
releaseShared(int arg)
方法时会尝试使用tryReleaseShared
方法释放资源,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程thread。被激活的线程则使用tryAcquireShared
尝试,成功则线程继续向下执行,否则还是会被放入队列中被挂起。注意:
tryAcquireShared
和tryReleaseShared
是需要具体子类来实现的另外,对于独占方式下的
void acquire(int arg)
和void acquireInterruptibly(int arg)
,与共享方式下的void acquireShared(int arg)
和void acquireSharedInterruptibly(int arg)
的区别在于:不带Interruptibly关键字的方法不对中断进行响应,而带Interruptibly关键字的方法对中断响应,即当线程被挂起时,其他线程中断了该线程,那么该线程会抛出InterruptedException异常而返回。
维护AQS队列,主要看入队操作
private Node enq(final Node node) {
for (;;) { // 死循环
Node t = tail;
if (t == null) { // 如果队列为空,创建结点,同时被head和tail引用
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // CAS设置尾结点
t.next = node;
return t;
}
}
}
}
条件变量的支持
正如 notify 和 wait 是配合 synchronized 内置锁实现线程间同步的基础设施一样,条件变量的 signal 和 await 方法也是用来配合锁(AQS实现的锁)实现线程间同步的基础设施。
synchronized 同时只能与一个共享变量的notify 或 wait 方法实现同步,而AQS的一个锁可以对应多个条件变量。
-
示例
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); lock.lock(); try { System.out.println("begin wait"); condition.await(); System.out.println("end wait"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } lock.lock(); try { System.out.println("begin signal"); condition.signal(); System.out.println("end signal"); } finally { lock.unlock(); }
lock.newCondition()
的作用其实是new了一个AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类。每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。当线程调用条件变量的 await() 方法时(必须先调用锁的 lock() 方法获取锁),在内部会构造一个类型为 Node.CONDITION 的Node结点,然后将该结点插入条件队列的尾部,之后会释放锁,并被阻塞挂起。
当另外一个线程调用条件变量的 signal() 和 signalAll() 方法时,会把条件队列里面的一个或者全部Node结点移动到AQS的阻塞队列里面,等待时机获取锁。
AQS只提供ConditionObject 的实现,并没有提供 newCondition 函数,该函数用来 new 一个 ConditionObject 对象,需要由AQS的子类来提供 newCondition 函数。
-
总结:一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。