java锁的机制

java提供了内置锁,即synchronized,除此以外,还提供了显式锁,下面我们分别分析其实现的机制,并讨论如何在这两者之间进行选择。

AQS


AQS即AbstractQueuedSynchronizer,一般用于管理同步类中的状态,它管理了一个整数状态信息,可以使用getState setState以及compareAndSetState来获取或修改状态。比如Semaphore可以用这个状态表示剩余的许可数目,ReentrantLock可以用它来表示当前持有锁的线程已经加锁的次数等等。其获取和释放操作的伪代码如下:

boolean acquire() throws InterruptedException {
  while(当前状态不允许获取) {
    if (需要阻塞获取请求) {
      当前线程不在队列中,则将其插入等待队列
      将当前调用线程阻塞
    } else {
      返回失败
    }
  }
  获取成功 修改状态
  将当前线程移出等待队列
  返回成功
}

void release() {
  更新状态
  if (新的状态允许别的线程获取资源) {
    选择一个或多个线程唤醒
  }
}

比如一个简单的二元闭锁(代码来自java并发编程实战)

public class OneShotLatch {
  private final Sync sync = new Sync();

  public void signal() { sync.releaseShared(0); }

  public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(0);
  }

  private class Sync extends AbstractQueuedSynchronizer {
    protected int tryAcquireShared(int ignored) {
      return (getState()) == 1 ? 1 : -1;
    }

    protected boolean tryReleaseShared(int ignored) {
      setState(1);
      return true;
    }
  }
}

AQS state为1时表示打开,0表示关闭。await调用acquireSharedInterruptibly,然后会去调用tryAcquireShared,如果state是1则tryAcquireShared返回成功并允许线程通过,否则线程将进入等待线程队列中去。

signal调用releaseShared,然后调用tryReleaseShared,然后让所有等待中的线程都再次尝试请求该同步器,从而通过闭锁。另外,AQS还提供了这些操作的限时版本,从而可以实现有时限的等待操作。

acquire

下面我们来看看acquire的代码:

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

上面的代码是一个模板方法,先调用了tryAcquire,如果失败,则调用addWaiter将当前线程加入等待队列中,然后再使用acquireQueued来尝试获取资源。完成后如果有中断,则调用selfInterrupt传递中断状态。

在addWaiter中,使用Node类封装了当前线程,Node的状态有:

        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;

使用prev和next可以形成一个链表:

 *      +------+  prev +-----+       +-----+
 * head |      | <---- |     | <---- |     |  tail
 *      +------+       +-----+       +-----+

关于状态,直接摘抄jdk8源码的注释:

     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
  • SIGNAL 这个记号表示当前node之后的节点已经或即将被阻塞,需要再release或者cancel后唤醒一个或若干个后续节点

  • CANCELLED表示这个线程已经cancel掉了

  • CONDITION 说明这个node在一个condition queue上 和sync queue没什么鸟关系

  • PROPAGATE releaseShared需要被所有等待中的node得知,doReleaseShared中会使用该状态

了解了Node类后,我们来看看addWaiter

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
  1. 如果目前队列非空,则尝试快速入队,使用CAS把自己设置为tail,并且返回自己对应的node
  2. 如果快速入队失败了 就调用enq将当前node入队
  3. 首先,如果队列为空的话会在队列中添加第一个dummy的节点,此时head == tail ==dummy节点 如果失败说明已经有别的线程设置过tail了 就再循环之前的操作
  4. tail非空,则自旋的将自己入队

入队以后,就调用acquireQueued。

  1. 获取该节点前驱 如果前驱是head,说明前面已经没有等待中的节点了,就尝试tryAcquire,成的话讲当前节点设置为head,返回
  2. 没有tryAcquire成功,则查看是否应当block当前线程 唤醒后检查thread.interrupted()状态
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

下面看看shouldParkAfterFailedAcquire,这个方法检查一个acquire失败的线程的状态,看看是否需要park(block)它。先看看node的prev的状态

  1. 如果是SIGNAL说明已经做好稍后被唤醒的准备了,返回true表明可以被park。
  2. 如果大于0说明被cancel了,那么一直找prev直到找到状态小于等于0的节点
  3. 如果都不是 那么把pred的状态设置为SIGNAL 为之后的park做准备,进入下一次acquireQueued的循环中
  4. 每次唤醒后,都会检查并传递中断状态 用acquireQueued返回,如果有中断 则抛出中断异常
   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

release

然后看看release的过程,会调用子类的tryRelease,这是一个模板方法模式。如果处理成功且head不为空且不是开始时添加的一个dummy head,则尝试调用unparkSuccessor唤起后继的线程们。

  1. 如果head的后继是null或已经被cancel了 则从tail开始向前找到一个等待中的线程。

  2. 如果有可以唤醒的线程,唤醒之,让其重新回到acquireQueued的无限循环中去

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

synchronized

被动锁,即synchronized方法或代码块的原理与基于AQS的主动锁有所区别,先来一段代码:

public class TestSynchronized {
    public static void main(String[] args) {
        synchronized (TestSynchronized.class) {
        }
    }

    public synchronized void method() {
    }
}

javap -c -verbose 以后看到:

  public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=3, args_size=1
         0: ldc           #2                  // class test/TestSynchronized
         2: dup
         3: astore_1
         4: monitorenter
         5: aload_1
         6: monitorexit
         7: goto          15
        10: astore_2
        11: aload_1
        12: monitorexit
        13: aload_2
        14: athrow
        15: return
      Exception table:
         from    to  target type
             5     7    10   any
            10    13    10   any
      LineNumberTable:
        line 7: 0
        line 8: 5
        line 9: 15
      StackMapTable: number_of_entries = 2
        frame_type = 255 /* full_frame */
          offset_delta = 10
          locals = [ class "[Ljava/lang/String;", class java/lang/Object ]
          stack = [ class java/lang/Throwable ]
        frame_type = 250 /* chop */
          offset_delta = 4

  public synchronized void method();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
      stack=0, locals=1, args_size=1
         0: return
      LineNumberTable:
        line 12: 0

可以看到,同步代码块前后有monitorenter 和 monitorexit 指令,而同步方法是在修饰符上添加了ACC_SYNCHRONIZED。

那么,synchronized到底是靠什么实现的呢?要了解这些,我们首先要明确两个概念:对象头和monitor。

对象头

HotSpot的每个对象都有一个头部,包括两部分信息:Mark Word(标记字段)和 Klass Pointer(类型指针)。

其中:

  • Mark Word 用于存储对象自身的运行时数据,如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程 ID、偏向时间戳等等。如果对象是一个数组,对象头中还会有一块用于记录数组长度的数据。在不同的情况下,mark word可以变长的表示不同的含义,例如在 32 位的HotSpot 虚拟机中对象未被锁定的状态下,Mark Word 的 32个Bits 空间中的 25Bits 用于存储对象哈希码(HashCode),4Bits 用于存储对象分代年龄,2Bits 用于存储锁标志位,1Bit固定为0,在其他状态(轻量级锁定、重量级锁定、GC标记、可偏向)下对象的存储内容如下表所示:
存储内容 标志位 状态
对象哈希码、对象分代年龄 01 未锁定
指向锁记录的指针 00 轻量级锁定
指向重量级锁的指针 10 膨胀(重量级锁定)
空,不需要记录信息 11 GC标记
偏向线程ID、偏向时间戳、对象分代年龄 01 可偏向

注意偏向锁、轻量级锁、重量级锁等都是jdk 1.6以后引入的哦。

synchronized的细分

monitor record

每一个被锁住的对象都与一个monitor record关联。每一个线程都有一个可用monitor record列表,同时还有一个全局的可用monitor record列表。每个monitor record的结构如下所示:

Monitor Record
Owner
EntryQ
RcThis
Nest
HashCode
Candidate
  • Owner:初始时为NULL表示当前没有任何线程拥有该monitor,当线程成功拥有该锁后保存线程唯一标识,当锁被释放时又设置为NULL;
  • EntryQ:关联一个系统互斥锁(semaphore),阻塞所有试图锁住monitor失败的线程。
  • RcThis:表示blocked或waiting在该monitor上的所有线程的个数。
  • Nest:用来实现重入锁的计数。
  • HashCode:保存从对象头拷贝过来的HashCode值(可能还包含GC age)。
  • Candidate:0表示没有需要唤醒的线程,1表示要唤醒一个继任线程来竞争锁。

在 java 虚拟机中,线程一旦进入到被synchronized修饰的方法或代码块时,指定的锁对象通过某些操作将对象头中的LockWord指向monitor 的起始地址与之关联,同时monitor 中的Owner存放拥有该锁的线程的唯一标识,确保一次只能有一个线程执行该部分的代码,线程在获取锁之前不允许执行该部分的代码。

偏向锁

public class TestSynchronized {
    private static Object lock = new Object();
    public static void main(String[] args) {
        method1();
        method2();
    }
    synchronized static void method1() {}
    synchronized static void method2() {}
}

偏向所是指若某一锁被线程获取后,便进入偏向模式,当线程再次请求这个锁时,就无需再进行相关的同步操作了,从而节约了操作时间,如果在此之间有其他的线程进行了锁请求,则锁退出偏向模式。比如上面,联系两个method都去获取了关于TestSynchronized.class的锁,就是一种偏向锁。

轻量级锁

通过膨胀一个处于01状态的对象的对象头,或者是将已处于膨胀状态但monitor record中Owner为NULL的monitor record通过CAS置换为当前线程,可以获取锁。
轻量级锁会不断的自旋来尝试CAS获取当前的锁

重量级锁

当自旋一定次数以后仍然没获取锁,那么就需要调用操作系统重量级的互斥锁了,此后,在锁被释放前所有试图获取锁的线程都将被挂起。

synchronized的用处

synchronized主要有两方面的用途:

  1. 防止竞争 保证某些代码同时只有一个线程执行,防止由于竞争导致逻辑出错
  2. 内存可见性 即获取锁时,线程会将本地缓存无效,从主内存中获取最新的数据;释放锁时,会将本地缓存刷新到主内存中,保证其他线程看到最新的数据

synchronized和显示锁的选择

从功能上看,显示锁明显比synchronized更为丰富,可以选择获取锁超时时间等,也可以自由的选择加锁的区域和锁是否是公平锁等特征。但是,用synchronized 管理锁定请求和释放时,JVM 在生成线程转储时能够包括锁定信息。这些对调试非常有价值,因为它们能标识死锁或者其他异常行为的来源。其次,作为一种内置的锁机制,可能会随着jdk的升级而得到优化,比如jdk 1.6以后的synchronized就比之前执行效率提高了很多。所以,在满足需求的情况下,建议优先使用synchronized来加锁。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,561评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,218评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,162评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,470评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,550评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,806评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,951评论 3 407
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,712评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,166评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,510评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,643评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,306评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,930评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,745评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,983评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,351评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,509评论 2 348

推荐阅读更多精彩内容