本文基于如下几个问题:
- 为什么叫可重入锁?它的应用场景是什么?
- 它的优势是什么?它的劣势是什么?
- 它的实现原理是什么?
01 应用场景
现在假设有那么一个情景,你在医院挂号看病,医生房间门是关着的,有一个叫号码的机器在叫号:
公平锁:文明排队,不允许有人插队,叫号器叫到谁,谁就进去,只要排着总会轮到你。
非公平锁:根本不管有没有人排队,上来就插队,打开门一看有人在问诊,关门出来,然后一个漂亮的180°转身,又去开门,如此往复2、3次,在其他患者准备打人时,乖乖跑去排队。当然,在未排队前,如果发现一个患者刚看好病离开座位,那么就插队成功,直接可以问诊。
无论是公平方式还是非公平方式,进门坐下来之后,你可以问医生问题一次,两次,无数次( 重入),只要你还坐着,你都可以问,但是一旦起身离开座位,你的位置就会被抢,除非没人排队,不然你失去了提问的资格。
以下片段摘抄翻译自ReentrantLock简介:
一个可重入的互斥锁具有与使用synchronized访问的隐式监视器锁相同的基本行为和语义,但具有扩展功能。
ReentrantLock的构造函数接受可选的true/false参数。 公平锁为true,使用公平锁的程序可能会比使用非公平锁的整体吞吐量低(速度慢),但各个线程获得锁的时间差异较小,并且可以保证缺少饥饿。选择false则为非公平锁,且默认为非公平锁。 值得注意的是,锁的公平性并不能保证线程调度的公平性,使用公平锁的线程还是可以在无竞争的空锁状态下的可以连续多次获得锁。
它的应用场景有:
- 场景1:如果已加锁,则不再重复加锁:多用于进行非重要任务防止重复执行,(如:清除无用临时文件,检查某些资源的可用性,数据备份操作等)
//如果已经被lock,则立即返回false不会等待,达到忽略操作的效果
if (lock.tryLock()) {
try {
//操作
} finally {
lock.unlock();
}
}
- 场景2:如果发现该操作已经在执行,则尝试等待一段时间,等待超时则不执行(尝试等待执行),用来防止由于资源处理不当长时间占用导致死锁情况(大家都在等待资源,导致线程队列溢出)。
try {
//如果已经被lock,尝试等待5s,看是否可以获得锁,如果5s后仍然无法获得锁则返回false继续执行
if (lock.tryLock(5, TimeUnit.SECONDS)) {
try {
//操作
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
e.printStackTrace(); //当前线程被中断时(interrupt),会抛InterruptedException
}
- 场景3:和synchronized相同的用法。
//公平锁;不加参数/参数为false,则是非公平锁
private ReentrantLock lock = new ReentrantLock(true);
try {
lock.lock(); //如果被其它资源锁定,会在此等待锁释放,达到暂停的效果
//操作
} finally {
lock.unlock();
}
- 场景4:可中断锁。synchronized与Lock在默认情况下是不会响应中断(interrupt)操作,会继续执行完。lockInterruptibly()提供了可中断锁来解决此问题。这种情况主要用于取消某些操作对资源的占用。如:(取消正在同步运行的操作,来防止不正常操作长时间占用造成的阻塞)
try {
lock.lockInterruptibly();
//操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
02 优势和劣势
优势:
1.相比synchronized有更多应用场景:如中断正在获取锁的线程,获取锁时无限的等待下去等。
2.同样的10次请求,如果把每次不同线程获取到锁定义为1次切换,公平性锁在测试中进行了10次切换,而非公平锁只有5次切换,这说明非公平锁的开销更小。
劣势:
1.容易造成“饥饿”,即:一个线程可能第二个排队,但是一直到程序结束才轮到执行,期间一直饥饿的等待着。
2.如果没有使用finally来释放Lock,那么相当于启动了一个定时炸弹
03 实现方式
非公平锁的实现方式是:
final void lock() {
if (CAS获取锁[1]){
将当前线程设置成独占线程 //成功插队
}else{
if ( ! CAS再次获取锁 && 尝试入队[3](增加节点[2], 1)){ //排队
中断当前线程;
}
}
}
CAS尝试获取锁[1]:
/**
* 非公平锁获取锁:1.判断锁是否被持有 2.判断是否重入
*/
final boolean nonfairTryAcquire(int acquires) { //acquires:1
final Thread current = Thread.currentThread();
int c = getState();
//1.此刻没有线程拥有锁
if (c == 0) { //可以锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//2.重入:判断拥有锁的线程是否为自己,是的话则增加state状态(进入次数)
else if (current == getExclusiveOwnerThread()) {//发现线程再次进入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//返回获取锁失败
return false;
}
增加节点2:
/**
* 以下方法就是将新节点插入到原尾节点(tail)后,成为新的尾节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//1.将新节点插入到原尾节点(tail)后,成为新的尾节点
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//2.enq:原队列为空,则队列初始化并将node插入队列
for (;;) {//无限循环,直到成功
Node t = tail;
if (t == null) { // 必须初始化
if (compareAndSetHead(new Node()))//插入头结点:unsafe.compareAndSwapObject(this, headOffset, null, update);
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//插入尾节点
t.next = node;
return t;
}
}
}
//返回
return node;
}
尝试入队3:
/**
* 尝试入队:判断头结点的下一个节点是否获取到锁,如果没有,尝试休眠线程
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//返回上一个node
/**
* 1.判断头结点的下一个节点是否获取到锁
*/
if (p == head && tryAcquire(arg)) {//上一个节点是头结点&&本节点成功获取了锁
setHead(node);//node成为头结点
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* 2.重构队列并阻塞线程(清除cancel状态的节点)【3.1】&&【3.2】
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
大致的流程是:
shouldParkAfterFailedAcquire3.1:
/**
*总体看来,shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,
*如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 1.如果前继的节点状态为SIGNAL,表明当前节点需要unpark,则返回成功,此时
* acquireQueued方法的第12行(parkAndCheckInterrupt)将导致线程阻塞
*/
return true;
if (ws > 0) {
/*
* 2.如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个
* 非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直
* 至规则1返回true,导致线程阻塞
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 3.如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt3.2:
/**
* park并且返回阻塞
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
公平锁的实现方式是:
//如果没有获取锁,则加入队列中排队
if (!tryAcquire(arg) 1 &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
tryAcquire1:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//头结点 && 获得锁
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
参考:魏鹏:《ReentrantLock(重入锁)以及公平性》2013
魏鹏:AbstractQueuedSynchronizer的介绍和原理分析 2013
深入JVM锁机制2-Lock
park和unpark
CAS的实现逻辑