Java 同步工具与组合类的线程安全性分析

何为线程安全的类?

一般来说,我们要设计一个线程安全的类,要从三个方面去考虑:

  1. 构成状态的所有变量。比如某个域是集合类型,则集合元素也构成该实例的状态。

  2. 某些操作所隐含的不变性条件。

  3. 变量的所有权,或称它是否会被发布。

基于条件的同步策略

不变性条件取决于类的语义,比如说计数器类的 counter 属性被设置为 Integer 类型,虽然其域值在 Integer.MIN_VALUEInteger.MAX_VALUE 之间,但是它的值必须非负。即:随着计数的进行, conuter >= 0 总是成立。

除了不变性条件之外,一些操作还需要通过后验条件,以此判断状态的更改是否有效。比如一个计数器计到 17 时,它的下一个状态只可能是 18。这实际涉及到了对原先状态的 "读 - 改 - 写" 三个连续的步骤,典型的如自增 ++ 等。"无记忆性" 的状态是不需要后验条件的,比如每隔一段时间测量的温度值。

先验条件可能是更加关注的问题,因为 "先判断后执行" 的逻辑到处存在。比如说对一个列表执行 remove 操作时,首先需要保证列表是非空的,否则就应该抛出异常。

在并发环境下,这些条件均可能会随着其它线程的修改而出现失真。

状态发布与所有权

在许多情况下,所有权和封装性是相互关联的。比如对象通过 private 关键字封装了它的状态,即表明实例独占对该状态的所有权 ( 所有权意味控制权 )。反之,则称该状态被发布。被发布的实例状态可能会被到处修改,因此它们在多线程环境中也存在风险。

容器类通常和元素表现出 "所有权" 分离的形式。比如说一个声明为 final 的列表,客户端虽然无法修改其本身的引用,但可以自由地修改其元素的状态。这些事实上被发布的元素必须被安全地共享。这要求元素:

  1. 自身是事实不可变的实例。

  2. 线程安全的实例。

  3. 被锁保护。

实例封闭

大多数对象都是组合对象,或者说这些状态也是对象。对组合类的线程安全性分析大致分为两类:

  1. 如果这些状态线程不安全,那应该如何安全地使用组合类?

  2. 即使所有的状态都线程安全,是否可以推断组合类也线程安全?或者说组合类是否还需要额外的同步策略?

对于第一个问题,见下方的 Bank 代码,它模拟了一个转账业务:

<pre class="prettyprint hljs java" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class Bank { private Integer amount_A = 100; private Integer amount_B = 50; public synchronized void transaction(Integer amount){
var log_0 = amount_A + amount_B;
amount_A += amount;
amount_B -= amount;
var log_1 = amount_A + amount_B; assert log_0 == log_1;
}
}
复制代码
</pre>

虽然 amount_Aamount_B 本身作为普通的 Integer 类型并不是线程安全的,但是它们具备线程安全的语义:

<pre class="hljs nginx" style="margin: 0px 0px 0.75em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">privatetransaction()
</pre>

也可以理解成: Bank 是为两个 Integer 状态提供线程安全性的容器。在此处,同步策略由 synchronized 内置锁实现。

编译器会在 synchronized 的代码区前后安插 monitorentermonitorexit 字节码表示进入 / 退出同步代码块。Java 的内置锁也称之监视器锁,或者监视器。

至于第二个问题,答案是:看情况,具体地说是分析是否存在不变性条件,在这里,它指代在转账过程当中,a 和 b 两个账户的余额之和应当不变。如果使用原子类型保护 amount_Aamount_B 的状态,那么是否就可以撤下 transaction() 方法上的内置锁了?

<pre class="prettyprint hljs cs" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class UnsafeBank { private final AtomicInteger amount_A = new AtomicInteger(100); private final AtomicInteger amount_B = new AtomicInteger(50); public void transaction(Integer amount){
amount_A.set(amount_A.get() - amount);
amount_B.set(amount_B.get() + amount);
}
}
复制代码
</pre>

transaction() 方法现在失去了锁的保护。这样,某线程 A 在执行交易的过程中,另一个线程 B 也可能会 "趁机" 修改 amount_B 的账目 —— 这个时机发生在线程 A 执行 amount_B.get() 之后,但在 amount_B.set() 之前。最终,B 线程的修改将被覆盖而丢失,在它看来,尽管两个状态均是原子变量,但不变性条件仍然被破坏了。

由此得到一个结论 —— 就算所有的可变状态都是原子的,我们可能仍需要在封装类的层面进一步考虑同步策略,最简单直接的就是找出封装类内的所有复合操作:

  1. 对同一个变量 ( 反复 ) 读-改-写。

  2. 修改受某个不变性条件约束的多个变量。

正确地拓展同步策略

在大部分情况下,我们不能通过直接修改类源码的形式补充同步策略。比如,普通的 List<T>接口不保证底下的各种实现是线程安全的,但我们可以通过类似代理的方式将线程安全委托给第三方。比如:

<pre class="prettyprint hljs php" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class ThreadSafeArrayList { private final List<Integer> list; public ThreadSafeArrayList(List<Integer> l){list = l;}
// 添加新的方法
public synchronized boolean putIfAbsent(Integer a){ if(list.contains(a)) { list.add(a); return true;
} return false;
} // 代理 add 方法,其它略
public synchronized boolean add(Integer a) { return list.add(a);
} // ...}
复制代码
</pre>

事实上,Java 类库已经有了对应的线程安全类。通常,我们应当优先重用这些已有的类。在下方的代码块中,我们使用 Collection.synchronizedList 工厂方法创建一个线程安全的 list 对象,这样似乎就只需要为新拓展的 putIfAbsent() 方法加锁了。

<pre class="prettyprint hljs php" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class ThreadUnSafeArrayList { private final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); // 添加新的方法
public synchronized boolean putIfAbsent(Integer a){ if(list.contains(a)) { list.add(a); return true;
} return false;
}
public boolean add(Integer a){return list.add(a);} //...}
复制代码
</pre>

但是,上述的代码是错误的。为什么?问题在于,我们使用了错误的锁进行了同步。当调用的是 add 方法时,使用的是列表对象的内置锁;但调用 putIfAbsent 方法时,我们使用的却是 ThreadUnsafeArrayList 对象的内置锁。这意味着 putIfAbsent 方法对于其它的方法来说不是原子的,因此无法确保一个线程执行 putIfAbsent 方法时,其它线程是否会通过调用其它方法修改列表。

因此,想要让这个方法正确执行,我们必须要在正确的地方上锁。

<pre class="prettyprint hljs php" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class ThreadUnSafeArrayList { private final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); public boolean putIfAbsent(Integer a){
synchronized (list){ if(list.contains(a)) { list.add(a); return true;
} return false;
}
}
}
复制代码
</pre>

同步容器

同步容器是安全的,但在某些情况下仍然需要客户端加锁。常见的操作如:

  1. 迭代;

  2. 跳转 ( 比如,寻找下一个元素 );

  3. 条件运算,如 "若没有则 XX 操作" ( 一种常见的复合操作 );

复合操作不受同步容器保护

这里有两个线程 T1,T2 分别会以不可预测的次序执行两个代码块,它们负责删除和读取 list中的末尾元素。我们在这里使用的是库中的同步列表,因此可以确保 size()remove()get() 方法全部是原子的。但是,当程序以 x1y1x2y2 的操作次序执行时,主程序最终仍然会抛出 IndexOutOfBoundsException 异常。

<pre class="prettyprint hljs php" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class DemoOfConcurrentFail { public final List<Integer> list = Collections.synchronizedList(new ArrayList<>());

{
    Collections.addAll(list, 1, 2, 3, 4, 5);
}    public static void main(String[] args) {        var testList = new DemoOfConcurrentFail().list;

    Runnable t1 = () -> {            var last = testList.size() - 1;  // x1
        testList.remove(last);  // x2
    };

    Runnable t2 = () -> {            var last = testList.size() -1;  // y1
        var  r = testList.get(last);  // y2
        System.out.println(r);
    };        new Thread(t1).start();        new Thread(t2).start();

}

}
复制代码
</pre>

究其原因,两个线程 T1,T2 执行的复合操作没有受锁保护 ( 实际上就是前文银行转账的例子中犯过的错误 )。所以正确的做法是对复合操作整体加锁。比如:

<pre class="prettyprint hljs cs" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">var mutex = new Object();

Runnable t1 = () -> {
synchronized (mutex){ var last = testList.size() - 1; // x1
testList.remove(last); // x2
}
};

Runnable t2 = () -> {
synchronized (mutex){ var last = testList.size() -1; // y1
var r = testList.get(last); // y2
System.out.println(r);
}
};// ...复制代码
</pre>

同步容器的迭代问题

在迭代操作中,类似的问题也仍然存在。无论是直接的 for 循环还是 for-each 循环,对容器的遍历方式是使用 Iterator。而使用迭代器本身也是先判断 ( hasNext ) 再读取 ( next ) 的复合过程。Java 对同步容器的迭代处理是:假设某一个线程在迭代的过程中发现容器被修改过了,则立刻失败 ( 也称及时失败 ),并抛出一个 ConcurrentModificationException 异常。

<pre class="prettyprint hljs coffeescript" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">// 可能需要运行多次才能抛出 ConcurrentModificationException
Runnable t1 = () -> { // 删除中间的元素
int mid = testList.size() / 2;
testList.remove(mid);
};

Runnable t2 = () -> { for(var item : testList){
System.out.println(item);
}
};new Thread(t1).start();new Thread(t2).start();
复制代码
</pre>

类似地,想要不受打扰地迭代容器元素,我们也要在 for 循环的外面加锁,但是可能并不是一个好的主意。假如容器的规模非常大,或者每个元素的处理时间非常长,那么其它等待容器执行短作业的线程会因此陷入长时间的等待,这会带来活跃性问题。

一个可行的方法就是实现读写分离 —— 一旦有写操作,则重新拷贝一份新的容器副本,而在此期间所有读操作则仍在原来的容器中进行,实现 "读-读共享"。当读操作远多于写操作时,这种做法无疑可以大幅度地提高程序的吞吐量,见后文的并发容器 CopyOnWriteArrayList

警惕隐含迭代的操作

不仅是显式的 for 循环会触发迭代。比如容器的 toString 方法在底层调用 StringBuilder.append() 方法依次将每一个元素的字符串拼接起来。除此之外,包括 equalscontainsAllremoveAllretainAll ,乃至将容器本身作为参数的构造器,都隐含了对容器的迭代过程。这些间接的迭代错误都有可能抛出 ConcurrentModificationException 异常。

并发容器

考虑到重量级锁对性能的影响,Java 后续提供了各种并发容器来改进同步容器的性能问题。同步容器将所有操作完全串行化。当锁竞争尤其激烈时,程序的吞吐量将大大降低。因此,使用并发容器来替代同步容器,在绝大部分情况下都算是一顿 "免费的午餐"。

ConcurrentHashMap

ConcurrentHashMap 使用了更小的封锁粒度换取了更大程度的共享,这个封锁机制称之为分段锁 ( Lock Stripping )。简单点说,就是每一个桶由单独的锁来保护,操作不同桶的两个线程不需要相互等待。好处是,在高并发环境下, ConcurrentHashMap 带来了更大的吞吐量,但问题是,封锁粒度的减小削弱了容器的一致性语义,或称弱一致性 ( Weakly Consistent )。

比如说需要在整个 Map 上计算的 size()isEmpty() 方法,弱一致性会使得这些方法的计算结果是一个过期值。这考虑到是一个权衡,因为在并发环境下,这两个方法的作用很小,因为其返回值总是不断变化的。因此,这些操作的需求被弱化了,以换取其它更重要的性能优化,比如getputcotainsKeyremove 等。

因此,除非一部分严谨的业务无法容忍弱一致性,否则并发的 HashMap 是要比同步 HashMap 更优的选择。

CopyOnWriteArrayList

该工具在读操作远多于写操作的场合下能够提供更好的并发性能,在迭代时不需要对容器进行加锁或者复制。当发生修改时,该容器会创建并重新发布一个新的容器副本。在新副本创建之前,一切读操作仍然以旧的容器为准,因此这不会抛出 ConcurrentModificationException 问题。

相对的,如果频繁调用 addremoveset 等方法,则该容器的吞吐量会大大降低,因为这些操作需要反复调用系统的 copy 方法复制底层的数组 ( 这也是没有设计 "CopyOnWriteLinkedList" 的原因,因为拷贝的效率会更低 )。同时,写入时复制的特性使得 CopyOnWriteArrayList 是弱一致性的。

阻塞队列 & 生产者 — 消费者模式

阻塞队列,简单地说,就是当队列为空时,执行 take 操作会进入阻塞状态;当队列满时,执行 put 操作也会进入阻塞状态。阻塞队列也可以分有界队列和无界队列。无界队列永远不会充满,因此执行 put 方法永远不会进入阻塞状态。但是,如果生产者的执行效率远超过消费者,那么无界队列的无限扩张最终会耗尽内存。有界队列则可以保证当队列充满时,生产者被 put 阻塞,通过这种方式来让消费者赶上工作进度。

可以用阻塞队列实现生产者 — 消费者模式,最常见的生产者 — 消费者模式是线程池与工作队列的组合。这种模式将 "发布任务" 与 "领取任务" 解耦,最大的便捷是简化了复杂的负载管理,因为生产者和消费者的执行速度并不总是相匹配的。同时,生产者和消费者的角色是相对的。比如处于流水线中游的组件,它们既作为上游的消费者,也作为下游的生产者。

Java 库已经包含了关于阻塞队列的多种实现,它自身保证 puttake 操作是线程安全的。

  1. LinkedBlockingQueueArrayBlockingQueue :此两者的区别可以参考 Link 和 Array,见:ArrayBlockingQueue 和 LinkedBlockingQueue 。两者均为 FIFO 的队列。

  2. PriorityBlockingQueue :优先级队列,当我们希望以一定次序处理任务时,它要比 FIFO 队列更实用。

  3. SynchronousQueue :译为同步阻塞队列。这个队列事实上没有缓存空间,而是维护一组可用的线程。当队列收到消息时,它可以立刻分配一个线程去处理。但是如果没有多余的工作线程,那么调用 put 或者 take 会立刻陷入阻塞状态。因此,仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

下方的代码块是由 SynchronousQueue 实现的简易 Demo,每个线程会抢占式消费消息。

<pre class="prettyprint hljs livescript" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">var chan = new SynchronousQueue<Integer>();var worker = new Thread(()->{ while(true){ try {
final var x = chan.take();
System.out.println("t1 consume: " + x);
} catch (InterruptedException e) {e.printStackTrace();}
}
});var worker2 = new Thread(()->{ while(true){ try {
final var x = chan.take();
System.out.println("t2 consume: " + x);
} catch (InterruptedException e) {e.printStackTrace();}
}
});worker.start();worker2.start();for(var i = 0 ; i < 10; i ++) chan.put(i);
复制代码
</pre>

基于所有权的角度去分析,生产者 — 消费者模式和阻塞队列一起促进了 串行的线程封闭 。线程封闭对象只能由单个对象拥有,但可以通过在执行的最后发布该对象 ( 即表示之后不会再使用它 ),以表示 "转让" 所有权。

阻塞队列简化了转移的逻辑。除此之外,还可以通过 ConcurrentMap 的原子方法 remove,或者是 AtomicReference 的 compareAndSet ( 即 CAS 机制 ) 实现安全的串行线程封闭。

双端队列和工作窃取

Java 6 之后增加了新的容器类型 —— Deque 和 BlockDeque,它们是对 Queue 以及 BlockingQueue 的拓展。Deque 实现了再队列头和队列尾的高效插入和移除,具体实现包括了 ArrayDeque 和 LinkedBlockingDeque。

双端队列适用于另一种工作模式 —— 工作窃取 ( Work Stealing )。比如,一个工作线程已经完成清空了自己的任务队列,它就可以从其它忙碌的工作线程的任务队列的尾部获取队列。这种模式要比生产者 —— 消费者具备更高的可伸缩性,因为工作线程不会在单个共享的任务队列上发生竞争。

工作窃取特别适合递归的并发问题,即执行一个任务时会产生更多的工作,比如:Web 爬虫,GC 垃圾回收时的图搜索算法。

阻塞和中断方法

线程可能会被阻塞,或者是暂停执行,原因有多种:等待 I/O 结束,等待获得锁,等待从 Thread.sleep 中唤醒,等待另一个线程的计算结果。被阻塞的线程必须要在这些 "外因" 被解决之后才有机会继续执行,即恢复到 RUNNABLE ( 也称就绪 ) 状态,等待被再次调度 CPU 执行。

这段描述其实对应了 JVM 线程的两个状态:BLOCKING 和 WAITING。

  1. BLOCKING,当线程准备进入一段新的同步代码块时,因不能获得锁而等待。

  2. WAITING,当线程已经进入同步代码块之后,在执行的过程中因不满足某些条件而暂停。这时可以调用 waiting 方法 释放已占据的锁 。其它工作线程得以抢占此锁并执行,直到满足先验条件为真时,其它线程可以通过 notifyAll 方法重新令监视此锁的所有 WAITING 线程再次争锁并继续工作。 wait / notify / notifyAll 构成了线程之间的协商机制,见下面的代码块。

<pre class="prettyprint hljs java" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">static class Status{public boolean v;}public static void main(String[] args) throws InterruptedException{

var status = new Status();
status.v = false;

var mutex = new Object();    new Thread(()->{        synchronized (mutex){
        System.out.println("get mutex");            // 此时检测的状态为 false, 进入 WAITING 状态。
        if(!status.v) try {mutex.wait();} catch (InterruptedException e) {e.printStackTrace();}            // 被唤醒后重新检测状态为 true。
        System.out.println(status.v);
    }
}).start();    new Thread(()->{        synchronized (mutex){            // 将状态设置为 true,唤醒上面的线程
        status.v = true;
        mutex.notify(); 
    }
}).start();

}
复制代码
</pre>

只有处于 RUNNABLE 状态的线程才会实际获得 CPU 使用权。

在 Java 中,一切会发生阻塞的方法都会被要求处理 InterruptedException 受检异常。调用阻塞方法的方法也会变成阻塞方法。线程内部有一个 boolean 类型的状态位表示中断,调用 interrupt 方法可以将该状态位标识为 true 。但是这不意味着该线程就会立刻中断:

<pre class="hljs nginx" style="margin: 0px 0px 0.75em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">InterruptedException
</pre>

同步工具类

Java 还提供了诸如信号量 ( Semaphore ),栅栏 ( Barrier ),以及闭锁 ( Latch ) 作为同步工具类,它们都包含了一定的结构性属性:这些状态将决定执行同步工具类的线程是执行还是等待。

闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到闭锁打开。在此之前,所有的线程必须等待,而在闭锁结束之后,这个锁将永久保持打开状态。这个特性适用于 需要确保某个任务的前序任务 ( 比如初始化 ) 全部完成之后才可以执行的场合,见下方的代码:Worker 线程等待另两个初始化线程准备就绪之后输出 p 的结果。

<pre class="prettyprint hljs livescript" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">// class Point{int x,y;}
final var p = new Point();
final var p_latch = new CountDownLatch(2);

// Workernew Thread(()->{ try {p_latch.await();} catch (InterruptedException e) {e.printStackTrace();}
System.out.printf("Point(x=%d,y=%d)",p.x,p.y);
}).start();

// Init xnew Thread(()->{
p.x = 1;
p_latch.countDown();
}).start();

// Init ynew Thread(()->{
p.y = 2;
p_latch.countDown();
}).start();
复制代码
</pre>

FutureTask 也可以拿来做闭锁,它实现了 Future 的语义,表示一个抽象的可生成结果的计算,一般需要由线程池驱动执行,表示一个异步的任务。

Runnable 接口表示无返回值的计算,Callable<T> 代表有返回值的计算。

<pre class="prettyprint hljs coffeescript" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">final var futurePoint = new FutureTask<>(()->new Point(1,2));new Thread(futurePoint).start();new Thread(()->{ try { // 在 Callable 计算出结果之前阻塞
var p = futurePoint.get();
System.out.printf("Point(x=%d,y=%d)",p.x,p.y);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
复制代码
</pre>

信号量

计数信号量用于控制某个资源的同时访问数量,通常用于配置有容量限制的资源池,或称有界阻塞容器。Semaphore 管理一组许可,线程在需要时首先获取许可,并在操作结束之后归还许可。如果许可数量被耗尽,那么线程则必须要阻塞到其它任意线程归还许可 ( 默认情况下遵循 Non-Fair 策略 ) 为止。特别地,当信号量的许可数为 1 时,则可认为是不可重入的互斥锁。

下面是一个利用信号量 + 同步容器实现的简易阻塞队列:

<pre class="prettyprint hljs java" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class BoundedBlockingQueue<E>{ final private List<E> list = Collections.synchronizedList(new LinkedList<>()); final private Semaphore se; public BoundedBlockingQueue(int cap){
se = new Semaphore(cap);
} public void enqueue(E e) throws InterruptedException {
se.acquire();
list.add(0,e);
} public E dequeue(){ final var done = list.remove(0);
se.release(); return done;
} @Override
public String toString() { return "BoundedBlockingQueue{" + "list=" + list + '}';
}
}
复制代码
</pre>

栅栏

栅栏 ( Barrier ) 类似于闭锁,同样都会阻塞到某一个事件发生。闭锁强调等待某个事件发生之后再执行动作,而栅栏更强调在某个事件发生之前等待其它线程。它可用于实现一些协议:"所有人在指定的时间去会议室碰头,等到所有的人到齐之后再开会",比如数据库事务的两阶段提交。

Java 提供了一个名为 CyclicBarrier 的栅栏,它指定了 N 个工作线程 反复地 在栅栏位置汇集。在某线程执行完毕之后,调用 await() 方法阻塞自身,以等待其它更慢的线程到达栅栏位置。当设定的 N 个线程均调用 await() 之后,栅栏将打开,此时所有的线程将可以继续向下执行代码,而栅栏本身的状态会重置,以便复用 ( 因而命名为 Cyclic- )。

见下面的代码,4 个线程并行执行初始化工作 ( 以随机时间的 sleep 模拟延迟 ),并等待所有线程初始化完毕之后同时打印信息。

<pre class="prettyprint hljs dart" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">final int N = 4;final var barrier = new CyclicBarrier(N);final Thread[] workers = new Thread[N];for(var i : new Integer[]{0,1,2,3}){ var t = new Thread(()->{ try { // 模拟随机的延时
var rdm = new Random().nextInt(1000);
Thread.sleep(rdm); // 在所有其它线程到达之前阻塞
barrier.await(); // 所有线程到达之后执行,每个线程打印延时时间
System.out.printf("prepare for %d millis\n",rdm);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
workers[i] = t;
t.start();
}// 等待所有的任务并行执行完毕。for(var worker : workers){worker.join();}
复制代码
</pre>

在不涉及 IO 操作和数据共享的计算问题当中,线程数量为 N CPU 或者 N CPU + 1 时会获得最优的吞吐量,更多的线程也不会带来带来帮助,甚至性能还会下降,因为 CPU 需要频繁的切换上下文。

一旦线程成功地到达栅栏,则 await() 方法会其标记为 "子线程"。 CyclicBarrier 的构造器还接受额外的 Runnable 接口做回调函数,当所有线程全部到达栅栏之后, CyclicBarrier 会从子线程当中挑选出一个领导线程去执行它 ( 即,每一轮通过栅栏之后,它都会被执行且仅一次 ),我们可以在此实现日志记录等操作。

<pre class="prettyprint hljs livescript" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">final var barrier = new CyclicBarrier(N,()->{
System.out.println("all runners ready");
});
复制代码
</pre>

在并行任务中构建高效的缓存

为了用简单的例子说明问题,我们在这里特别强调并行 ( Parallel ) 任务,这些任务的计算过程是纯粹 ( Pure ) 的 —— 这样的函数被称之纯函数。无论它们何时被调用,被哪个线程调用,同样的输入永远得到同样的输出。纯函数不和外部环境交互,因此自然也就不存在竞态条件。

一个非常自然的想法是使用缓存 ( 或称记忆机制 Memorized ) 避免重复的运算。在纯粹的映射关系中,固定的输入总是对应固定的输出,因此使用 K-V 键值对来记忆结果再好不过了。我们基于 HashMap 给出最简单的一版实现,然后再探讨如何改进它们。

<pre class="prettyprint hljs dart" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class MapCacheV1 {
private final HashMap<Integer,String> cache = new HashMap<>();
public synchronized String getResult(Integer id){ var v = cache.get(id); if (v == null){ // 设定中,这个静态方法具有 500ms 左右的延迟。
v = PURE.slowOperation(id);
cache.put(id,v);
} return v;
}
}
复制代码
</pre>

尽管我们打算将 MapCache 用于无竞态条件的并行任务,但 getResult() 方法仍然加上了同步锁,因为 HashMap 本身不是线程安全的, cache 需要以安全的方式被并发访问。然而,这种做法无疑会使得 getResult() 方法变得十分笨重,因为原本可以并行的慢操作 PURE.slowOperation() 也被锁在了代码块内部。

最先想到的是使用更加高效的 ConcurrentHashMap 类取代线程不安全的 HashMap ,以获得免费的多线程性能提升:

<pre class="prettyprint hljs dart" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class MapCacheV2 {
private final ConcurrentHashMap<Integer,String> cache = new ConcurrentHashMap<>();
public String getResult(Integer id){ var v = cache.get(id); if(v == null){
v = PURE.slowOperation(id);
cache.put(id,v);
} return v;
}
}
复制代码
</pre>

同时,我们这一次取消掉了 getResult() 上的同步锁。这样,多线程可以并行地执行慢操作,只在修改 cache 时发生竞争。但这个缓存仍有一些不足 —— 当某个线程 A 在计算新值时 ( 即这 500ms 之内 ),其它线程并不知道。因此,多个线程有可能会计算同一个新值,甚至导致其它的计算任务无法进行。

针对这个问题,我们再一次提出改进。不妨让 cache 保存 "计算过程",而非值。这样,工作线程将有三种行为:

  1. 缓存中没有此计算任务,注册并执行。

  2. 缓存中有此计算任务,但未完毕,当前线程阻塞 ( 将 CPU 让给其它需要计算的线程 )。

  3. 缓存中有此计算任务,且已计算完毕,直接返回。

回顾前文在闭锁中提到的 FutureTask<V> 类型,它适合用于当前的实现,见下方的代码:

<pre class="prettyprint hljs dart" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">class MapCacheV3 {

private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>();
public String getResult(Integer id) throws ExecutionException, InterruptedException {        // 获取一个计算任务,而非值
    final var task = cache.get(id);        if(task == null){            final var newTask = new FutureTask<>(()-> PURE.slowOperation(id));            // cache.putIfAbsent()
        cache.put(id,newTask);
        newTask.run();            // 提交并执行任务。
        return newTask.get();
    }else return task.get();
}

}
复制代码
</pre>

MapCacheV3 的实现已经近乎完美了。唯一不足的是:我们对 cache 的操作仍然是 "先判断后执行" 的复合操作,但现在 getResult 并没有同步锁的保护。两个线程仍然同时调用 cache.get()并判空,并开始执行重复的计算。

下面的版本给出了最终的解决方案:使用 ConcurrentMapputIfAbsent() 原子方法修复可能重复添加计算任务的问题。

<pre class="prettyprint hljs kotlin" style="margin: 0px 0px 1.5em; padding: 0.5em; border-radius: 4px; background: rgb(246, 246, 246); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; border: none; overflow-x: auto;">public String getResult(Integer id) throws ExecutionException, InterruptedException { // 获取一个计算任务,而非值
final var task = cache.get(id); if(task == null){ final var newTask = new FutureTask<>(()-> PURE.slowOperation(id)); // put 和 putIfAbsent 方法均会返回此 Key 对应的上一个旧值 Value。
// 如果 put 的是一个新的 Key,则返回值为 null。
final var maybeNull = cache.putIfAbsent(id,newTask); if(maybeNull == null) newTask.run(); return newTask.get();
}else return task.get();
}
复制代码
</pre>

值得注意的是,一旦 cache 存储的是计算任务而非值,那么就可能存在缓存污染的问题。一旦某个 FutureTask 的计算被取消,或者失败,应当及时将它从缓存中移除以保证将来的计算成功,而不是放任其驻留在缓存内部返回失败的结果。

缓存思想几乎应用在各个地方。比如在 Web 服务中,用户的数据往往不会总是直接来自数据库,而是 Redis 这样的消息中间件。在实际的应用环境下,还有更加复杂的问题需要被考虑到,比如缓存内容过时 ( expired ),或者是定期清理缓存空间等。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容