java提供了内置锁,即synchronized,除此以外,还提供了显式锁,下面我们分别分析其实现的机制,并讨论如何在这两者之间进行选择。
AQS
AQS即AbstractQueuedSynchronizer,一般用于管理同步类中的状态,它管理了一个整数状态信息,可以使用getState setState以及compareAndSetState来获取或修改状态。比如Semaphore可以用这个状态表示剩余的许可数目,ReentrantLock可以用它来表示当前持有锁的线程已经加锁的次数等等。其获取和释放操作的伪代码如下:
boolean acquire() throws InterruptedException {
while(当前状态不允许获取) {
if (需要阻塞获取请求) {
当前线程不在队列中,则将其插入等待队列
将当前调用线程阻塞
} else {
返回失败
}
}
获取成功 修改状态
将当前线程移出等待队列
返回成功
}
void release() {
更新状态
if (新的状态允许别的线程获取资源) {
选择一个或多个线程唤醒
}
}
比如一个简单的二元闭锁(代码来自java并发编程实战)
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal() { sync.releaseShared(0); }
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int ignored) {
return (getState()) == 1 ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored) {
setState(1);
return true;
}
}
}
AQS state为1时表示打开,0表示关闭。await调用acquireSharedInterruptibly,然后会去调用tryAcquireShared,如果state是1则tryAcquireShared返回成功并允许线程通过,否则线程将进入等待线程队列中去。
signal调用releaseShared,然后调用tryReleaseShared,然后让所有等待中的线程都再次尝试请求该同步器,从而通过闭锁。另外,AQS还提供了这些操作的限时版本,从而可以实现有时限的等待操作。
acquire
下面我们来看看acquire的代码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面的代码是一个模板方法,先调用了tryAcquire,如果失败,则调用addWaiter将当前线程加入等待队列中,然后再使用acquireQueued来尝试获取资源。完成后如果有中断,则调用selfInterrupt传递中断状态。
在addWaiter中,使用Node类封装了当前线程,Node的状态有:
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
使用prev和next可以形成一个链表:
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
关于状态,直接摘抄jdk8源码的注释:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
SIGNAL 这个记号表示当前node之后的节点已经或即将被阻塞,需要再release或者cancel后唤醒一个或若干个后续节点
CANCELLED表示这个线程已经cancel掉了
CONDITION 说明这个node在一个condition queue上 和sync queue没什么鸟关系
PROPAGATE releaseShared需要被所有等待中的node得知,doReleaseShared中会使用该状态
了解了Node类后,我们来看看addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 如果目前队列非空,则尝试快速入队,使用CAS把自己设置为tail,并且返回自己对应的node
- 如果快速入队失败了 就调用enq将当前node入队
- 首先,如果队列为空的话会在队列中添加第一个dummy的节点,此时head == tail ==dummy节点 如果失败说明已经有别的线程设置过tail了 就再循环之前的操作
- tail非空,则自旋的将自己入队
入队以后,就调用acquireQueued。
- 获取该节点前驱 如果前驱是head,说明前面已经没有等待中的节点了,就尝试tryAcquire,成的话讲当前节点设置为head,返回
- 没有tryAcquire成功,则查看是否应当block当前线程 唤醒后检查thread.interrupted()状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
下面看看shouldParkAfterFailedAcquire,这个方法检查一个acquire失败的线程的状态,看看是否需要park(block)它。先看看node的prev的状态
- 如果是SIGNAL说明已经做好稍后被唤醒的准备了,返回true表明可以被park。
- 如果大于0说明被cancel了,那么一直找prev直到找到状态小于等于0的节点
- 如果都不是 那么把pred的状态设置为SIGNAL 为之后的park做准备,进入下一次acquireQueued的循环中
- 每次唤醒后,都会检查并传递中断状态 用acquireQueued返回,如果有中断 则抛出中断异常
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
release
然后看看release的过程,会调用子类的tryRelease,这是一个模板方法模式。如果处理成功且head不为空且不是开始时添加的一个dummy head,则尝试调用unparkSuccessor唤起后继的线程们。
如果head的后继是null或已经被cancel了 则从tail开始向前找到一个等待中的线程。
如果有可以唤醒的线程,唤醒之,让其重新回到acquireQueued的无限循环中去
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
synchronized
被动锁,即synchronized方法或代码块的原理与基于AQS的主动锁有所区别,先来一段代码:
public class TestSynchronized {
public static void main(String[] args) {
synchronized (TestSynchronized.class) {
}
}
public synchronized void method() {
}
}
javap -c -verbose 以后看到:
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: ldc #2 // class test/TestSynchronized
2: dup
3: astore_1
4: monitorenter
5: aload_1
6: monitorexit
7: goto 15
10: astore_2
11: aload_1
12: monitorexit
13: aload_2
14: athrow
15: return
Exception table:
from to target type
5 7 10 any
10 13 10 any
LineNumberTable:
line 7: 0
line 8: 5
line 9: 15
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 10
locals = [ class "[Ljava/lang/String;", class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
public synchronized void method();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=0, locals=1, args_size=1
0: return
LineNumberTable:
line 12: 0
可以看到,同步代码块前后有monitorenter 和 monitorexit 指令,而同步方法是在修饰符上添加了ACC_SYNCHRONIZED。
那么,synchronized到底是靠什么实现的呢?要了解这些,我们首先要明确两个概念:对象头和monitor。
对象头
HotSpot的每个对象都有一个头部,包括两部分信息:Mark Word(标记字段)和 Klass Pointer(类型指针)。
其中:
- Mark Word 用于存储对象自身的运行时数据,如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程 ID、偏向时间戳等等。如果对象是一个数组,对象头中还会有一块用于记录数组长度的数据。在不同的情况下,mark word可以变长的表示不同的含义,例如在 32 位的HotSpot 虚拟机中对象未被锁定的状态下,Mark Word 的 32个Bits 空间中的 25Bits 用于存储对象哈希码(HashCode),4Bits 用于存储对象分代年龄,2Bits 用于存储锁标志位,1Bit固定为0,在其他状态(轻量级锁定、重量级锁定、GC标记、可偏向)下对象的存储内容如下表所示:
存储内容 | 标志位 | 状态 |
---|---|---|
对象哈希码、对象分代年龄 | 01 | 未锁定 |
指向锁记录的指针 | 00 | 轻量级锁定 |
指向重量级锁的指针 | 10 | 膨胀(重量级锁定) |
空,不需要记录信息 | 11 | GC标记 |
偏向线程ID、偏向时间戳、对象分代年龄 | 01 | 可偏向 |
注意偏向锁、轻量级锁、重量级锁等都是jdk 1.6以后引入的哦。
monitor record
每一个被锁住的对象都与一个monitor record关联。每一个线程都有一个可用monitor record列表,同时还有一个全局的可用monitor record列表。每个monitor record的结构如下所示:
Monitor Record |
---|
Owner |
EntryQ |
RcThis |
Nest |
HashCode |
Candidate |
- Owner:初始时为NULL表示当前没有任何线程拥有该monitor,当线程成功拥有该锁后保存线程唯一标识,当锁被释放时又设置为NULL;
- EntryQ:关联一个系统互斥锁(semaphore),阻塞所有试图锁住monitor失败的线程。
- RcThis:表示blocked或waiting在该monitor上的所有线程的个数。
- Nest:用来实现重入锁的计数。
- HashCode:保存从对象头拷贝过来的HashCode值(可能还包含GC age)。
- Candidate:0表示没有需要唤醒的线程,1表示要唤醒一个继任线程来竞争锁。
在 java 虚拟机中,线程一旦进入到被synchronized修饰的方法或代码块时,指定的锁对象通过某些操作将对象头中的LockWord指向monitor 的起始地址与之关联,同时monitor 中的Owner存放拥有该锁的线程的唯一标识,确保一次只能有一个线程执行该部分的代码,线程在获取锁之前不允许执行该部分的代码。
偏向锁
public class TestSynchronized {
private static Object lock = new Object();
public static void main(String[] args) {
method1();
method2();
}
synchronized static void method1() {}
synchronized static void method2() {}
}
偏向所是指若某一锁被线程获取后,便进入偏向模式,当线程再次请求这个锁时,就无需再进行相关的同步操作了,从而节约了操作时间,如果在此之间有其他的线程进行了锁请求,则锁退出偏向模式。比如上面,联系两个method都去获取了关于TestSynchronized.class的锁,就是一种偏向锁。
轻量级锁
通过膨胀一个处于01状态的对象的对象头,或者是将已处于膨胀状态但monitor record中Owner为NULL的monitor record通过CAS置换为当前线程,可以获取锁。
轻量级锁会不断的自旋来尝试CAS获取当前的锁
重量级锁
当自旋一定次数以后仍然没获取锁,那么就需要调用操作系统重量级的互斥锁了,此后,在锁被释放前所有试图获取锁的线程都将被挂起。
synchronized的用处
synchronized主要有两方面的用途:
- 防止竞争 保证某些代码同时只有一个线程执行,防止由于竞争导致逻辑出错
- 内存可见性 即获取锁时,线程会将本地缓存无效,从主内存中获取最新的数据;释放锁时,会将本地缓存刷新到主内存中,保证其他线程看到最新的数据
synchronized和显示锁的选择
从功能上看,显示锁明显比synchronized更为丰富,可以选择获取锁超时时间等,也可以自由的选择加锁的区域和锁是否是公平锁等特征。但是,用synchronized 管理锁定请求和释放时,JVM 在生成线程转储时能够包括锁定信息。这些对调试非常有价值,因为它们能标识死锁或者其他异常行为的来源。其次,作为一种内置的锁机制,可能会随着jdk的升级而得到优化,比如jdk 1.6以后的synchronized就比之前执行效率提高了很多。所以,在满足需求的情况下,建议优先使用synchronized来加锁。