Android中的并发编程第一篇

1 volatile 的工作原理


众所周知,在如今的计算机时代,CPU的运算处理速度与内存读写速度的差异非常巨大,为了解决这种差异充分利用CPU的使用效率,于是在CPU 和内存之间增加 CPU Cache(高速缓存),将运算需要的数据先复制到CPU Cache中,在进行运算时CPU不再和内存打交道,而是直接对CPU Cache 进行读写。除此之外还可以减少 CPU 与 I/O 设备争抢访存,由于 CPU 和 I/O 设备会竞争同一条内存总线,有可能出现 CPU 等待 I/O 设备访存的情况。而如果 CPU 能直接从缓存中获取数据,就可以减少竞争,提高 CPU 的使用率。

1.1 缓存一致性问题

CPU Cache 会引起缓存一致性问题, 在分析缓存一致性问题时,忽略 L1 / L2 / L3 的多级缓存结构,提出缓存一致性抽象模型:



在单核 CPU 中,只需要考虑 Cache 与内存的一致性。但是在多核 CPU 中,由于每个核心都有一份独占的 Cache,就会存在一个核心修改Cache 数据后,两个核心 Cache 数据不一致的问题。因此CPU 的缓存一致性问题应该从 2 个维度考虑:

  • Cache 与内存的一致性问题: 在修改 Cache 数据后,如何同步回内存?

  • 多核心 Cache 的一致性问题: 在一个核心修改 Cache 数据后,如何同步给其他核心 Cache?

1.1.1 Cache 与内存的一致性问题

1> 写直达策略(Write-Through)

写直达策略的读取过程:

  • 1、CPU 在访问内存地址时,会先检查该地址的数据是否已经加载到 Cache 中;

  • 2、如果数据在 Cache 中,则直接读取 Cache 上的数据到 CPU 中;

  • 3、如果数据不在 Cache 中:

    • 如果 Cache 已装满或者 Cache Line 被占用,先执行替换策略,腾出空闲位置;

    • 访问内存地址,并将内存地址所处的整个Cache LIne 写入到映射的 Cache Line 中;

    • 读取 Cache Line上的数据到 CPU 中。

写直达策略的写入过程:

  • 1、如果数据不在 Cache 中,则直接将数据写入内存;

  • 2、如果数据已经加载到 Cache 中,则不仅要将数据写入 Cache,还要将数据写入内存。

写直达的优点和缺点:

  • 优点: 每次读取操作就是纯粹的读取,不涉及对内存的写入操作,读取速度更快;

  • 缺点: 每次写入操作都需要同时写入 Cache 和写入内存,在写入操作上失去了 CPU 高速缓存的价值,需要花费更多时间。


2> 写回策略(Write Back)

既然写直达策略在每次写入操作都会写内存,那么有没有什么办法可以减少写回内存的次数呢?这就是写回策略:

  • 1、写回策略会在每个 Cache Line上增加一个 “脏(Dirty)” 标记位 ,当一个 Cache Line 被标记为脏时,说明它的数据与内存数据是不一致的;

  • 2、在写入操作时,我们只需要修改 Cache Line 并将其标记为脏,而不需要写入内存;

  • 3、那么,什么时候才将脏数据写回内存呢?—— 就发生在 Cache 块被替换出去的时候:

    • 在写入操作中,如果目标内存块不在 Cache 中,需要先将内存块数据读取到 Cache 中。如果替换策略换出的旧 Cache Line 是脏的,就会触发一次写回内存操作;

    • 在读取操作中,如果目标内存块不在 Cache 中,且替换策略换出的旧 Cache 块是脏的,就会触发一次写回内存操作;

可以看到,写回策略只有当一个 Cache 数据将被替换出去时判断数据的状态,清(未修改过,数据与内存一致) 的 Cache 块不需要写回内存, 的 Cache 块才需要写回内存。这个策略能够减少写回内存的次数,性能会比写直达更高。

当然,写回策略在读取的时候,有可能不是纯粹的读取了,因为还可能会触发一次脏 Cache Line 的写入。


通过写直达或写回策略,我们已经能够解决 在修改 Cache 数据后,如何同步回内存 的问题

1.1.2 多核心 Cache 的一致性问题

在单核 CPU 中,我们通过写直达策略或写回策略保持了Cache 与内存的一致性。但是在多核 CPU 中,由于每个核心都有一份独占的 Cache,就会存在一个核心修改数据后,两个核心 Cache 不一致的问题。

举个例子:

  • 1、Core 1 和 Core 2 读取了同一个内存块的数据,在两个 Core 都缓存了一份内存块的副本。此时Cache 和内存块是一致的;

  • 2、Core 1 执行内存写入操作:

    • 在写直达策略中,新数据会直接写回内存,此时,Cache 和内存块一致。但由于之前 Core 2 已经读过这块数据,所以 Core 2 缓存的数据还是旧的。此时,Core 1 和 Core 2 不一致;

    • 在写回策略中,新数据会延迟写回内存,此时 Cache 和内存块不一致。不管 Core 2 之前有没有读过这块数据,Core 2 的数据都是旧的。此时,Core 1 和 Core 2 不一致。

  • 3、由于 Core 2 无法感知到 Core 1 的写入操作,如果继续使用过时的数据,就会出现逻辑问题。



    可以看到:由于两个核心的工作是独立的,在一个核心上的修改行为不会被其它核心感知到,所以不管 CPU 使用写直达策略还是写回策略,都会出现缓存不一致问题。 所以,我们需要一种机制,将多个核心的工作联合起来,共同保证多个核心下的 Cache 一致性,这就是缓存一致性机制。

1.1.3 写传播 & 事务串行化

缓存一致性机制必须解决的如下两个问题:

  • 写传播(Write Propagation): 一个CPU核心对Cache中的值进行了修改,需要传播其他CPU核心,也就是需要用到写更新或者写无效策略。

    当某个 CPU 核心的Cache中执行写操作时,其他 CPU 核心中对应的Cache Line 的更新策略也有两种

    • 写更新(Write Update):一个CPU核心对Cache中的值进行修改时,该 CPU 核心都必须先发起一次总线请求,通知其他 CPU 核心将它们的CPU Cache 值更新为刚写入的值,所以写更新会很占用总线带宽。

    • 写无效(Write Invalidate) : 一个CPU核心对Cache中的值进行修改时,该 CPU 核心都必须先发起一次总线请求,通知其他 CPU 核心将它们的CPU Cache 设置为无效。MESI协议用的就是这个策略,也是绝大多数 CPU 都会采用缓存一致性协议。这是因为多次写操作只需要发起一次总线事件即可,第一次写已经将其他缓存的值置为无效,之后的写不必再更新状态,这样可以有效地节省 CPU 核间总线带宽。

  • 事务串行化(Transaction Serialization): 各个 CPU 核心所有写入操作的顺序,在所有 CPU 核心看起来是一致。

写传播解决了 感知 问题,如果一个核心修改了数据,就需要同步给其它核心。但只做到同步还不够,如果各个核心收到的同步信号顺序不一致,那最终的同步结果也会不一致。
举个例子:假如 CPU 有 4 个核心,Core 2 将共享数据修改为 1000,随后 Core 1 将共享数据修改为 2000。在写传播下,“修改为 1000” 和 “修改为 2000” 两个事务会同步到 Core 3 和 Core 4。但是如果没有事务串行化,不同核心收到的事务顺序可能是不同的,最终数据还是不一致。

1.1.4 总线嗅探 & 总线仲裁

写传播和事务串行化在 CPU 中是如何实现的呢?

  • 写传播 - 总线嗅探(bus snooping): 本质上就是进行Cache读写操作时发送到总线请求,然后让各个核心去嗅探这些请求,再根据本地的情况进行响应;

  • 事务串行化 - 总线仲裁: 总线的独占性要求同一时刻最多只有一个模块占用总线,天然地会将所有核心对内存的读写操作串行化。如果多个核心同时发起总线事务,此时总线仲裁单元会对竞争做出仲裁,未获胜的事务只能等待获胜的事务处理完成后才能执行。

基于总线嗅探和总线仲裁,现代 CPU 逐渐形成了各种缓存一致性协议,例如 MESI 协议。

MESI 协议使用的是写回策略和写无效策略,MESI 协议通过Cache Line 的四种状态非常有效地降低了 CPU 核间带宽

1.1.5 MESI 协议


CPU Cache 是由很多个 Cache Line 组成的,CPU Line 是 CPU 从内存读取数据的基本单位,Cache Line 大小通常是64字节。

CPU对Cache的请求:

  1. PrRd: CPU核心请求一个 Cache Line

  2. PrWr: CPU核心请求一个 Cache Line

总线对Cache的请求:

  1. BusRd: 其他CPU核心请求一个 Cache Line

  2. BusRdX: 其他CPU核心请求一个该CPU核心不拥有的缓存块

  3. BusUpgr: 其他CPU核心请求一个该CPU核心拥有的缓存块

  4. Flush: 请求回写整个Cache Line到内存

  5. FlushOpt: 整个Cache Line被发到总线以发送给另外一个CPU核心(缓存到缓存的复制)

初始状态 操作 响应
Shared(S) PrRd - 无总线事务生成 - 状态保持不变 - 读操作为缓存命中
Shared(S) PrWr - 发出总线事务BusUpgr信号 - 状态转换为(M)Modified - 其他缓存看到BusUpgr总线信号,标记其副本为(I)Invalid.
Modified(M) PrRd - 无总线事务生成 - 状态保持不变 - 读操作为缓存命中
Modified(M) PrWr - 无总线事务生成 - 状态保持不变 - 写操作为缓存命中
Invalid(I) PrRd - 发出总线事务BusRd信号 - 其他CPU核心看到BusRd,检查自己是否有有效的数据副本,通知发出请求的缓存 - 状态转换为(S)Shared, 如果其他缓存有有效的副本 - 状态转换为(E)Exclusive, 如果其他缓存都没有有效的副本 - 如果其他缓存有有效的副本, 其中一个缓存发出数据(FlushOpt);否则从主存获得数据
Invalid(I) PrWr - 发出总线事务BusRdX信号 - 状态转换为(M)Modified - 如果其他缓存有有效的副本, 其中一个缓存发出数据(FlushOpt);否则从主存获得数据 - 如果其他缓存有有效的副本, 见到BusRdX信号后无效其副本 - 向Cache Line中写入修改后的值
Exclusive(E) PrRd - 无总线事务生成 - 状态保持不变 - 读操作为缓存命中
Exclusive(E) PrWr - 无总线事务生成 - 状态转换为(M)Modified - 向Cache Line中写入修改后的值

缓存一致性协议定义了Cache Line的4个状态:独占(exclusive)、共享(share)、修改(modified)、失效(invalid)。

  • M(Modified,修改): 表明 Cache Line 被修改过,与内存中不一致,只有本地一个拷贝(专有),并且其它核心的同一个 Cache Line 会失效;

  • E(Exclusive,独占): 表明 Cache LIne 只有本地一个拷贝(专有);

  • S(Share,共享): 表明 Cache Line 不仅有本地一个拷贝并且其它核心也存在其拷贝;

  • I(Invalid,失效): 未从内存加载数据或者已失效;

独占共享 状态下,Cache Line的数据是干净的,任何读取操作可以直接使用该Cache Line的数据;

失效修改 状态下,Cache Line 的数据是脏的,其数据和内存中的可能不一致,在读取或写入 失效 Cache Line 时,需要先将其它核心 修改 的Cache Line写回内存,再从内存读取;

共享失效 状态,核心没有获得 Cache Line 的独占权(锁)。在修改数据时不能直接修改,而是要先向总线发起 RFO(Request For Ownership)请求 ,将其它核心的 Cache 置为 失效,等到获得回应 ACK 后才算获得 Cache Line 的独占权。

这个独占权这有点类似于开发语言层面的锁概念,在修改资源之前,需要先获取资源的锁;

修改独占 状态下,核心已经获得了 Cache Line 的独占权(锁)。在修改数据时不需要向总线发送RFO 请求,能够减轻总线的通信压力。

MESI 协议有一个非常 nice 的在线体验网站,你可以对照文章内容,在网站上操作指令区,并观察内存和缓存的数据和状态变化。网站地址:(VivioJS MESI)

1.1.6 写缓冲区 & 失效队列

MESI 协议保证了 CPU Cache 的一致性,但完全地遵循协议会影响性能。 因此,现代的 CPU 会增加写缓冲区和失效队列将 MESI 协议的请求异步化,以提高效率:

  • 写缓冲区(Store Buffer)

由于在写入操作之前,CPU Core 需要先发起 RFO 请求获得独占权,在其它 CPU Core 回应 ACK 之前,当前 CPU Core 只能空等待,这对 CPU 资源是一种浪费。因此现代 CPU 会采用 写缓冲区 机制,写入的数据放到写缓冲区后并发送 RFO 请求后,CPU 就可以去执行其它任务,等收到 ACK 后再将写入数据写到 Cache 上。

当前CPU核心如果要读Cache Line中的数据,需要先扫描Store Buffer之后再读取Cache Line(Store-Buffer Forwarding)。

  • 失效队列(Invalidation Queue)

Store Buffer 容量是有限的,当Store Buffer满了之后CPU核心还是要卡住等待ACK。所以其他核心在收到 RFO 请求时,需要及时回应 ACK。如果核心很忙不能及时回复,就会造成发送 RFO 请求的核心在等待 ACK。因此现代 CPU 会采用 失效队列 机制,先把其它核心发过来的 RFO 请求放到失效队列,然后直接返回 ACK,等当前核心处理完任务后再去处理失效队列中的失效请求。

因此核心可能并不知道在它Cache里的某个Cache Line是Invalid状态的,因为失效队列包含有收到但还没有处理的Invalidation消息。CPU在读取数据的时候,并不像Store buffer那样读取Invalidate queue。

1.2 内存屏障

CPU 已经实现了 MESI 协议,已经在硬件层面实现了写传播和事务串行化,为什么 Java 语言层面还需要定义 volatile 关键字呢?岂不是多此一举?

MESI 协议解决了缓存一致性,一致性有强弱之分:

  • 强一致性: 保证在任意时刻任意副本上的同一份数据都是相同的,或者允许不同,但是每次使用前都要刷新确保数据一致,所以最终还是一致。

  • 弱一致性: 不保证在任意时刻任意副本上的同一份数据都是相同的,也不要求使用前刷新,但是随着时间的迁移,不同副本上的同一份数据总是向趋同的方向变化,最终还是趋向一致。

例如,MESI 协议就是强一致性的,但引入写缓冲区或失效队列后就变成弱一致性,随着写缓冲区和失效队列被消费,各个核心 Cache 最终还是会趋向一致状态。

引入写缓冲区或失效队列后会导致内存一致性的问题(出现内存重排 Memory Reordering),举个例子:初始状态变量 a 和变量 b 都是 0,现在 Core1 和 Core2 分别执行这两段指令,最终 x 和 y 的结果是什么?

Core1

a = 1; // A1
x = b; // A2


Core2

b = 2; // B1
y = a; // B2

写缓存区造成内存重排:


可以看到:从内存的视角看,直到 Core1 执行 A3 来刷新写缓冲区,写操作 A1 才算真正执行了。虽然 Core 的执行顺序是 A1 → A2 → B1 → B2,但内存看到的顺序却是 A2 → B1 → B2 → A1,变量 a 写入没有同步给对变量 a 的读取,即发生了内存重排。内存重排是硬件上无法解决的问题,这个时候就必须加入内存屏障来解决这个问题

内存屏障 (Memory Barrier)分为写屏障(Store Barrier)、读屏障(Load Barrier)和全屏障(Full Barrier),写屏障会阻塞等待Store Buffer中的数据同步刷到Cache后再执行屏障后面的读写操作;读屏障会阻塞Invalid Queue中的消息理完成后再执行屏障后面的读写操作。

JVM 并不直接显露内存屏障。(Memory barriers are not directly exposed by the JVM. )反之,为了保证语言级的并发原语语义,它们被 JVM 插入到指令序列中。(Instead they are inserted into the instruction sequence by the JVM in order to uphold the semantics of language level concurrency primitives. )

还是以上面的例子说明下:

Core1

// volatile 类型
a = 1; // A1
// 插入写内存屏障
// 插入读内存屏障
x = b; // A2

Core2

// volatile 类型
b = 2; // B1
// 插入写内存屏障
// 插入读内存屏障
y = a; // B2

初始状态变量 a 和变量 b 都是 0 并且设置为volatile类型,现在 Core1 和 Core2 分别执行这两段指令

由于在 A1和B1之后插入一个写内存屏障,所以在执行A2和B2之前会将a=1和b=2刷到Cache

由于在 A2前插入一个读内存屏障,所以在执行A2之前会将b=1同步到Core1的Cache

由于在 B2前插入一个读内存屏障,所以在执行B2之前会将a=1同步到Core2的Cache

内存屏障解决了内存重排的问题,除此意外内存屏障可以阻止屏障两侧的指令重排序(即编译器重排)

需要注意的是,内存屏障并不是Java源代码中的一部分,它们是在编译到机器指令时由Java内存模型隐式插入的,写Java代码时是看不到的。

x86架构CPU提供了比较强的缓存一致性支持,但有的ARM构的CPU指供的缓存一致性支持就很弱,大部分Android手机采用的CPU是ARM架构,所以需要在正确的位置插入内存屏障来保证一致性。

对于app开发者来说理解到这里就可以了,下面从源码角度分析,有兴趣的同学可以自己看看Hotspot虚拟机arm架构下的模板解释器中的putfield和getfield方法。

2 synchronized 的工作原理

首先举个例子:

public class UnSafeTest {
    @SneakyThrows
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        CustomService customService = new CustomService();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    customService.add();
                }
                countDownLatch.countDown();
            }).start();
        }
        // 等待其他线程执行完毕
        countDownLatch.await();
        System.out.println("num:" + customService.getNum());
    }

    static class CustomService {
        private int num = 0;

        public void add() {
            num++;
        }

        public int getNum() {
            return num;
        }
    }
}

上述代码启动了10个线程,每个线程使num累加100次,期望结果是1000,但打印通常小于1000,这是一个典型的线程安全问题。 我们在多线程环境下调用了CustomService.add(),因而导致线程不安全,那么为什么会有线程安全问题呢?我们来看关键代码:

public void add() {
    num++;
}

num++ 实际上并不是原子操作,这需要看其对应的字节码信息(可以使用javap -v xx.class进行查看):

  public void add();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=3, locals=1, args_size=1
         0: aload_0
         1: dup
         2: getfield      #2                  // Field num:I
         5: iconst_1
         6: iadd
         7: putfield      #2                  // Field num:I
        10: return

可以发现num++实际上由3个指令组成getfield、iadd、putfield组成。
getfield:获取变量值
iadd:执行+1
putfield:设置变量值
既然num++实际上由3步组成,那么在多线程环境中就无法保证其执行过程的原子性,通过下图可以更加清晰的说明这一点:

上面这种情况是原子性问题导致线程不安全,synchronized是一个同步锁,在同一时刻被修饰的方法或代码块只有一个线程能执行,以保证线程安全。很多人都称之为重量级锁(也叫做悲观锁),但是随着JDK1.6对synchronized进行了锁升级的优化后,在线程竞争不激烈的情况下性能还是不错的,接下来看看synchronized怎么解决线程安全问题:

public synchronized void add() {
    num++;
}

或者

public void add() {
    synchronized (CustomService.class) {
        num++;
    }
}

接下来看一下字节码

public synchronized void add();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
      stack=3, locals=1, args_size=1
         0: aload_0
         1: dup
         2: getield      #2                  // Field num:I
         5: iconst_1
         6: iadd
         7: putfield      #2                  // Field num:I
        10: return

或者

  public void add();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=3, locals=3, args_size=1
         0: ldc           #3                  // class com/cytmxk/trainproject/UnSafeTest$CustomService
         2: dup
         3: astore_1
         4: monitorenter
         5: aload_0
         6: dup
         7: getfield      #2                  // Field num:I
        10: iconst_1
        11: iadd
        12: putfield      #2                  // Field num:I
        15: aload_1
        16: monitorexit
        17: goto          25
        20: astore_2
        21: aload_1
        22: monitorexit
        23: aload_2
        24: athrow
        25: return

synchronized修饰方法时,会在访问标识符(flags)中加入ACC_SYNCHRONIZED标识
方法级同步是隐式执行的。当调用这些方法时,如果发现会ACC_SYNCHRONIZED标识,则会进入一个monitor,执行方法,然后退出monitor。这时如果其他线程来请求执行方法,会因为无法获得监视器锁而被阻断住。
无论方法调用正常还是发生异常,都会自动退出monitor,也就是释放锁。

synchronized修饰代码块时,会增加monitorenter和monitorexit指令
每个对象都与一个监视器monitor关联,执行monitorenter指令的线程尝试获取锁对象关联的监视器monitor的所有权,此时其他线程将阻塞等待,直到执行monitorexit退出monitor时,
其他线程才有机会来获取监视器monitor的所有权。
synchronized保证了有且仅有一个线程获取执行权(即保证了原子性)。

synchronized也是基于MESI协议和内存屏障实现的缓存一致性
在monitorenter指令之后有一个Load内存屏障将CPU Cache更新到最新值
在monitorexit指令之后会有一个Store内存屏障将Store Buffer更新到CPU Cache

3 通过ReentrantLock解决线程安全问题

3.1 LockSupport 的工作原理

AQS使用LockSupport来控制线程的阻塞和唤醒,我们更加熟悉的阻塞唤醒操作是wait/notify方式,它是以Object的角度来设计,而LockSupport提供的park/unpark则是以线程的角度来设计。LockSupport主要提供两类操作:

1 park 操作提供了4个方法
/**
 * 许可证可用则消耗许可证并且调用立即返回;否则当前线程会进入WAITING或者TIMED_WAITING状态,直到以下三种情况之一发生:
 * 1> 其他线程以当前线程为目标调用 unpark
 * 2> 其他线程中断当前线程
 * 3> 调用错误地(没有原因地)返回
 * 
 * 此方法不报告导致方法返回的原因,调用方应首先重新检查导致线程停止的条件,调用方还可以在返回时确定线程的中断状态。
 */
public static void park() {
    U.park(false, 0L);
}
/**
 * 同上
 */
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    U.park(false, 0L);
    setBlocker(t, null);
}
/**
 * 许可证可用则消耗许可证并且调用立即返回;否则当前线程会进入WAITING或者TIMED_WAITING状态,直到以下三种情况之一发生:
 * 1> 其他线程以当前线程为目标调用 unpark
 * 2> 其他线程中断当前线程
*  3> 指定的等待时间已到
 * 4> 调用错误地(没有原因地)返回
 * 
 * 此方法不报告导致方法返回的原因,调用方应首先重新检查导致线程停止的条件,调用方还可以在返回时确定线程的中断状态或者调用花费的时间。
 */
public static void parkNanos(Object blocker, long nanos) {
    if (nanos > 0) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        U.park(false, nanos);
        setBlocker(t, null);
    }
}
/**
 * 许可证可用则消耗许可证并且调用立即返回;否则当前线程会进入WAITING或者TIMED_WAITING状态,直到以下三种情况之一发生:
 * 1> 其他线程以当前线程为目标调用 unpark
 * 2> 其他线程中断当前线程
*  3> 到达指定的时间线
 * 4> 调用错误地(没有原因地)返回
 * 
 * 此方法不报告导致方法返回的原因,调用方应首先重新检查导致线程停止的条件,调用方还可以在返回时确定线程的中断状态或者调用返回时的时间。
 */
public static void parkUntil(Object blocker, long deadline) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    U.park(true, deadline);
    setBlocker(t, null);
}
上面三种形式的park都有blocker参数,此参数在线程进入WAITING或者TIMED_WAITING状态时被记录,以帮助监视工具和诊断工具确定线程受阻塞的原因(通过getBlocker方法获取blocker)。
看下线程dump的结果来理解blocker的作用。

2 unpark 操作
/**
 * 使给定线程的许可证可用(如果尚未可用),如果线程在 park上被阻塞那么它将解除阻塞,否则保证下一次调用park不会被阻塞。如果指定的线 
 * 程尚未启动,则不能保证此操作有任何效果。
 */
public static void unpark(Thread thread) {
    if (thread != null)
        U.unpark(thread);
}

归根结底,LockSupport调用的是Unsafa的native代码:

// park方法
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  ...
  // 调用parker的park方法
  thread->parker()->park(isAbsolute != 0, time);
  ...
UNSAFE_END

// unpark方法
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  ...
  if (p != NULL) {
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
    // 调用parker的unpark方法
    p->unpark();
  }
UNSAFE_END

每个JAVA线程都有一个Parker成员变量

  // JSR166 per-thread parker
private:
  Parker*    _parker;
public:
  Parker*     parker() { return _parker; }

park函数使当前线程进入WAITING或者TIMED_WAITING状态,而其unpark函数则是将指定线程唤醒,看一下Parker的park和unpark方法:

class Parker : public os::PlatformParker {
private:
  volatile int _counter ;
  ...
public:
  Parker() : PlatformParker() {
    _counter       = 0 ; //初始化为0
    ...
  }
...
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();
  ...
};

os::PlatformParker是对底层的抽象,根据不同的操作系统有不同的实现,在Linux系统下,是用Posix线程库pthreads中的mutex(互斥量)和condition(条件变量)实现的,mutex和condition保护了一个counter的变量,当park时counter被设置为0,当unpark时_counter被设置为1。接下来看一下Linux系统中的实现:

class PlatformParker : public CHeapObj<mtInternal> {
  protected:
    enum {
        REL_INDEX = 0,
        ABS_INDEX = 1
    };
    int _cur_index;  // which cond is in use: -1, 0, 1
    pthread_mutex_t _mutex [1] ; // pthread互斥锁
    pthread_cond_t  _cond  [2] ; // pthread条件变量数组,一个用于相对时间,一个用于绝对时间

  public:       // TODO-FIXME: make dtor private
    ~PlatformParker() { guarantee (0, "invariant") ; }

  public:
    PlatformParker() {
      int status;
      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
      assert_status(status == 0, status, "cond_init rel");
      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
      assert_status(status == 0, status, "cond_init abs");
      status = pthread_mutex_init (_mutex, NULL);
      assert_status(status == 0, status, "mutex_init");
      _cur_index = -1; // mark as unused
    }
};

下面是park和unpark方法的具体实现

void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  if (Atomic::xchg(0, &_counter) > 0) return; // 使用 xchgl 指令将_counter的值设置为0,如果_counter原来的值大于0则返回

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Optional optimization -- avoid state transitions if there's an interrupt pending.
  // Check interrupt before trying to wait
  if (Thread::is_interrupted(thread, false)) { // 如果线程处于中断状态,直接返回
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
 // 如果time小于0,或者isAbsolute是true并且time等于0则直接返回
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    unpackTime(&absTime, isAbsolute, time);
  }


  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt); // 构造当前线程的 ThreadBlockInVM

  // 如果当前线程设置了中断标志,或者获取mutex互斥锁失败则直接返回
  // 由于Parker是每个线程都有的,所以_counter cond mutex都是每个线程都有的,不是所有线程共享的,所以加锁失败只有两种情况,
  // 1> unpark已经加锁这时只需要返回即可,对应unpark先调用的情况
  // 2> 调用pthread_mutex_trylock出错
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { // 尝试获取锁,失败则返回
    return;
  }

  int status ;
  // 如果当前线程持有许可(即_counter大于0),说明之前已经调用unpark方法将_counter置为了1
  if (_counter > 0)  {
    _counter = 0; // 将许可消耗掉
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  assert(_cur_index == -1, "invariant");
  // 如果time等于0,说明是相对时间也就是isAbsolute是fasle(否则前面就直接返回了)
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    // 当前线程进入WAITING状态
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    // 判断isAbsolute是false还是true,false的话使用_cond[0],否则用_cond[1]
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    // 使用条件变量当前线程进入TIMED_WAITING状态
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }

  // 如果当前线程被唤醒则继续向下执行
  _cur_index = -1;
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif

  _counter = 0 ; // 返回后 _counter 状态位重置
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // 使用内存屏障使_counter对其它线程可见
  OrderAccess::fence();

  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}

void Parker::unpark() {
  int s, status ;
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ;
  s = _counter;
  _counter = 1;
  if (s < 1) {
    // 说明当前parker对应的线程挂起了,因为_cur_index初始是-1,并且等待条件变量的线程被唤醒
    // 后也会将_cur_index重置-1
    if (_cur_index != -1) {
      // 如果设置了WorkAroundNPTLTimedWaitHang先调用signal再调用unlock,在hotspot在Linux下默认使用这种方式
      // 即先调用signal再调用unlock
      if (WorkAroundNPTLTimedWaitHang) {
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      } else {
        // must capture correct index before unlocking
        int index = _cur_index;
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
        status = pthread_cond_signal (&_cond[index]);
        assert (status == 0, "invariant");
      }
    } else { // 如果_cur_index == -1说明线程没在等待条件变量,则直接解锁
      pthread_mutex_unlock(_mutex);
      assert (status == 0, "invariant") ;
    }
  } else { // 如果_counter == 1,说明线程调用了一次或多次unpark但是没调用park,则直接解锁
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}

Atomic::xchg 源码

inline jint     Atomic::xchg    (jint     exchange_value, volatile jint*     dest) {
  __asm__ volatile (  "xchgl (%2),%0"
                    : "=r" (exchange_value)
                    : "0" (exchange_value), "r" (dest)
                    : "memory");
  return exchange_value;
}

__asm__ 代表后面的xchgl是汇编指令,xchgl是一个原子操作,交换exchange_value和dest的值,并且将exchange_value的值返回。

首先说明一下上面用到的三个方法:

int pthread_mutex_lock(pthread_mutex_t* mutex);    // 阻塞式的加锁
int pthread_mutex_trylock(pthread_mutex_t* mutex); // 非阻塞式的加锁,加不上锁就返回
int pthread_mutex_unlock(pthread_mutex_t* mutex);  // 解锁
// 以上函数成功返回0;失败返回错误码

接下来总结一下park方法的逻辑:
1> 使用 xchgl 指令将counter修改为 0 返回,如果counter原来的值大于0则返回,即有许可直接消费掉
2> pthread_mutex_trylock 尝试获取锁,失败则返回非0的错误码(其他线程unpark该线程时持有mutex互斥量或者pthread_mutex_trylock调用出错) 3> 如果持有许可(即counter >0成立),则消费掉许可(即执行counter = 0),释放锁(执行pthread_mutex_unlock)然后返回
4> 如果 3 不成立,根据时间的不同执行不同的等待函数,如果等待正确返回,则消费掉许可(即执行
counter = 0),释放锁(执行pthread_mutex_unlock)然后返回

unpark的逻辑为:
1> pthread_mutex_lock 获取锁,可能会阻塞线程
2> 将参数线程设置为持有许可(即_counter 设置为 1)
3> 判断 _counter 的旧值:
小于 1 时,调用 pthread_cond_signal 唤醒在 park 阻塞的线程;
等于 1 时,直接返回

3.2 CAS 的工作原理

说到CAS,第一个想到的就是原子类型,其实4.1中的问题就可以通过AtomicInteger解决:

static class CustomService {
    private final AtomicInteger num = new AtomicInteger();

    public void add() {
        num.getAndIncrement();
    }

    public int getNum() {
        return num.get();
    }
}

为什么上面的代码可以解决线程安全的问题呢?先看下AtomicInteger的部分源码:

// Unsafe 对象,可以直接根据内存地址操作数据,可以突破JVM的现在直接操作内存,所以是不安全的
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
// 存储value属性在AtomicInteger类实例内部的偏移地址
private static final long VALUE;

static {
    try {
        // 在类初始化的时候就获取就获取到了value属性在对象内部的偏移地址
        VALUE = U.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (ReflectiveOperationException e) {
        throw new Error(e);
    }
}
// 存储实际的值
private volatile int value;
......
/**
 * Atomically increments by one the current value.
 *
 * @return the previous value
 */
public final int getAndIncrement() {
    return U.getAndAddInt(this, VALUE, 1);
}

1> 首先内部持有一个Unsafe对象,原子类底层的操作都是基于Unsafe进行的。
2> volatile int value 是AtomicInteger实例的实际数值,volatile保证并发中的可见性和有序性
3> VALUE 是 value属性在AtomicInteger对象内部的偏移地址,通过Unsafe类是可以在内存级别给变量赋值的,要操作AtomicInteger实例的数据,首先要知道AtomicInteger实例的内存地址,其次是要知道value属性在对象内部的偏移量VALUE,接着直接给这块内存赋值就行了:


4> AtomicInteger的getAndIncrement()方法是基于Unsafe的getAndAddInt的,Unsafe的getAndAddInt方法源码

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

1> o + offset 得到value属性在内存中的地址,然后根据地址从内存中获取value的值,保存在v中(即期望值)。
2> compareAndSwapInt : v的值跟当前内存的值(o + offsetSet 地址对应的值)进行对比,如果相等则将内存的值修改为 v + delta(即CAS操作成功),如果值不相等,则进入下一次循环,
   直到CAS操作成功为止。
3> CAS操作是怎么保证原子性的呢(比较和修改这是两个动作,如果比较的时候是一样的,修改的时候被别的线程修改了)?

接下来hotspot中compareAndSwapInt的实现

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

Atomic::cmpxchg 源码

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

__asm_ "cmpxchg面的xchgl是汇编指令,LOCK_IF_MP代表如果设计多核心则添加lock,使cmpxchgl指令原子性

CAS保证了加1操作的原子性,volatile确保了可见性、有序性,所以可以解决上面的问题


3.3 ReentrantLock的工作原理

LockSupport 底层使用互斥量mutex和condition,互斥量mutex和condition是内核提供的能力,所以性能消耗是非常大的。

首先通过下图整体看一下ReentrantLock的类结构


首先ReentrantLock继承自父类Lock,然后有3个内部类,其中Sync内部类继承自AQS,另外的两个内部类继承自Sync,这两个类分别是用来实现公平锁和非公平锁的,首先看一下非公平锁的流程图:

接下来就是对应的源码:

// ReentrantLock.java

/** Synchronizer providing all implementation mechanics */
private final Sync sync;

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

public void lock() {
    sync.lock();
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        // 通过CAS的方式先判断state == 0是否成立,如果成立则将当前state设置为1
        if (compareAndSetState(0, 1))
            // state成功设置为1,说明当前线程获取到资源,则将当前线程保存下来
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 失败的话将当前线程放到等待队列
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    abstract void lock();

    // 非公平的尝试获取锁
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // c == 0 表示没有线程占有锁
        if (c == 0) {
            // 通过CAS的方式先判断state == 0是否成立,如果成立则将当前state设置为1
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果当前线程已经占有锁,则state加1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    ...
}
// AbstractQueuedSynchronizer.java

private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long HEAD;
private static final long TAIL;

static {
    try {
        STATE = U.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        HEAD = U.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        TAIL = U.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
    } catch (ReflectiveOperationException e) {
        throw new Error(e);
    }

    // Reduce the risk of rare disastrous classloading in first call to
    // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
    Class<?> ensureLoaded = LockSupport.class;
}

/**
 * The synchronization state.
 */
private volatile int state;

protected final boolean compareAndSetState(int expect, int update) {
    return U.compareAndSwapInt(this, STATE, expect, update);
}

// 请求锁
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Sync通过acquirerelease方法获取个释放锁,所以 ReentrantLock实现的是AQS的独占模式,也就是独占锁

上面说明了非公平锁的获取和释放流程,接下来看一下公平锁:

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        // 将当前线程放到等待队列
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // c == 0 表示没有线程占有锁
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // 等待队列中不存在有效节点并且CAS成功的情况下当前线程获取到锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果当前线程已经获取到锁,则state加1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

释放锁的流程和非公平锁一样,不一样的地方体现在获取锁的流程。

接下来看一下AQS中核心等待队列的管理:

3.3.1 线程加入队列的时机

当执行Acquire(1)时,会通过tryAcquire获取锁。在这种情况下,如果获取锁失败,就会调用addWaiter加入到等待队列中去。

获取锁失败后,会执行 addWaiter(Node.EXCLUSIVE) 加入等待队列,具体实现方法如下:

private Node addWaiter(Node mode) {
    // 创建于当前线程关联的Node节点(即将thread字段设置为当前线程)
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            // 将node的prev设置为oldTail
            U.putObject(node, Node.PREV, oldTail);
            // 使用CAS的方式判断oldTail地址和tail的地址是否相同,成立的话将tail设置为node
            if (compareAndSetTail(oldTail, node)) {
                // 最终完成将Node插入队尾
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

private final void initializeSyncQueue() {
    Node h;
    // 创建第一个节点(虚节点,占位用),head和tail同时指向该节点
    if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
        tail = h;
}

回到公平锁的代码,hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回False,说明当前线程可以争取共享资源;如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // h != t 成立代表等待对列中存在等待的节点
    // (s = h.next) == null 成立代表其他线程正在添加到等待队列,进行到了tail已经指向新Node,但是Head还没有指向新Node,此时队列中有元素,需要返回true
    // s.thread != Thread.currentThread() 成立代表等待队列的第一个有效节点线程与当前线程不同,所以当前线程不可以获取锁,当前线程必须加入进等待队列
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

回到最初的代码:

// 请求锁
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上文解释了addWaiter方法,这个方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node。而这个Node会作为参数,进入到acquireQueued方法中。acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者被park挂起:

final boolean acquireQueued(final Node node, int arg) {
    try {
        // 标记等待过程中是否中断过
        boolean interrupted = false;
        // 开始自旋,要么获取锁,要么中断
        for (;;) {
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            // 如果p是首节点,说明当前节点在真实数据队列的头部,就尝试获取锁(别忘了头结点是虚节点)
            if (p == head && tryAcquire(arg)) {
                // 获取锁成功,head指针移动到当前node,node节点变成虚节点
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            // 说明p为首节点且当前没有获取到锁(可能是锁被其他线程占了)或者是p不是首节点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为SIGNAL),
            // 防止无限循环浪费资源。具体两个方法下面细细分析
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

// 将指定的节点设置为首节点(即虚节点)
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前驱节点的节点状态
    int ws = pred.waitStatus;
    // 说明前驱节点处于SIGNAL状态(在独占锁机制中,waitStatus 只会使用到 CANCELLED 和 SIGNAL 两个状态)
    if (ws == Node.SIGNAL)
        return true;
    // 通过枚举值我们知道waitStatus>0是取消状态
    if (ws > 0) {
        do {
            // 循环向前查找取消节点,把取消节点从队列中剔除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 设置前驱节点等待状态为SIGNAL
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

上述方法的流程图如下:


从上图可以看出,跳出当前循环的条件是当前置节点是头结点,且当前线程获取锁成功。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire流程):

3.3.2 等待队列中出队时机

// ReentrantLock.java

public void unlock() {
    sync.release(1);
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    // 尝试释放锁
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        // 如果当前线程没有占有锁则抛出异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // c == 0 代表当前线程要释放锁
        if (c == 0) {
            free = true;
            // 清空占有锁的线程对象
            setExclusiveOwnerThread(null);
        }
        // 更新state的值
        setState(c);
        return free;
    }
}
// AbstractQueuedSynchronizer.java

// 释放锁
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 解锁队列总的第一个有效节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    // 获取当前节点的waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,030评论 5 464
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,198评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 144,995评论 0 327
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,973评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,869评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,766评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,967评论 3 388
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,599评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,886评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,901评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,728评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,504评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,967评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,128评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,445评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,018评论 2 343
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,224评论 2 339

推荐阅读更多精彩内容