终止耗时任务
并发程序通常使用长时间运行的任务。可调用任务在完成时返回值;虽然这给它一个有限的寿命,但仍然可能很长。可运行的任务有时被设置为永远运行的后台进程。你经常需要一种方法在正常完成之前停止 Runnable 和 Callable 任务,例如当你关闭程序时。
最初的 Java 设计提供了中断运行任务的机制(为了向后兼容,仍然存在);中断机制包括阻塞问题。中断任务既乱又复杂,因为你必须了解可能发生中断的所有可能状态,以及可能导致的数据丢失。使用中断被视为反对模式,但我们仍然被迫接受。
InterruptedException,因为设计的向后兼容性残留。
任务终止的最佳方法是设置任务周期性检查的标志。然后任务可以通过自己的 shutdown 进程并正常终止。不是在任务中随机关闭线程,而是要求任务在到达了一个较好时自行终止。这总是产生比中断更好的结果,以及更容易理解的更合理的代码。
以这种方式终止任务听起来很简单:设置任务可以看到的 boolean flag。编写任务,以便定期检查标志并执行正常终止。这实际上就是你所做的,但是有一个复杂的问题:我们的旧克星,共同的可变状态。如果该标志可以被另一个任务操纵,则存在碰撞可能性。
在研究 Java 文献时,你会发现很多解决这个问题的方法,经常使用 volatile 关键字。我们将使用更简单的技术并避免所有易变的参数,这些都在附录:低级并发 中有所涉及。
Java 5 引入了 Atomic 类,它提供了一组可以使用的类型,而不必担心并发问题。我们将添加 AtomicBoolean 标志,告诉任务清理自己并退出。
// concurrent/QuittableTask.java
import java.util.concurrent.atomic.AtomicBoolean;import onjava.Nap;
public class QuittableTask implements Runnable {
final int id;
public QuittableTask(int id) {
this.id = id;
}
private AtomicBoolean running =
new AtomicBoolean(true);
public void quit() {
running.set(false);
}
@Override
public void run() {
while(running.get()) // [1]
new Nap(0.1);
System.out.print(id + " "); // [2]
}
}
虽然多个任务可以在同一个实例上成功调用 quit() ,但是 AtomicBoolean 可以防止多个任务同时实际修改 running ,从而使 quit() 方法成为线程安全的。
- [1]:只要运行标志为 true,此任务的 run() 方法将继续。
需要 running AtomicBoolean 证明编写 Java program 并发时最基本的困难之一是,如果 running 是一个普通的布尔值,你可能无法在执行程序中看到问题。实际上,在这个例子中,你可能永远不会有任何问题 - 但是代码仍然是不安全的。编写表明该问题的测试可能很困难或不可能。因此,你没有任何反馈来告诉你已经做错了。通常,你编写线程安全代码的唯一方法就是通过了解事情可能出错的所有细微之处。
作为测试,我们将启动很多 QuittableTasks 然后关闭它们。尝试使用较大的 COUNT 值
// concurrent/QuittingTasks.java
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import onjava.Nap;
public class QuittingTasks {
public static final int COUNT = 150;
public static void main(String[] args) {
ExecutorService es =
Executors.newCachedThreadPool();
List<QuittableTask> tasks =
IntStream.range(1, COUNT)
.mapToObj(QuittableTask::new)
.peek(qt -> es.execute(qt))
.collect(Collectors.toList());
new Nap(1);
tasks.forEach(QuittableTask::quit); es.shutdown();
}
}
输出结果:
24 27 31 8 11 7 19 12 16 4 23 3 28 32 15 20 63 60 68 6764 39 47 52 51 55 40 43 48 59 44 56 36 35 71 72 83 10396 92 88 99 100 87 91 79 75 84 76 115 108 112 104 107111 95 80 147 120 127 119 123 144 143 116 132 124 128
136 131 135 139 148 140 2 126 6 5 1 18 129 17 14 13 2122 9 10 30 33 58 37 125 26 34 133 145 78 137 141 138 6274 142 86 65 73 146 70 42 149 121 110 134 105 82 117106 113 122 45 114 118 38 50 29 90 101 89 57 53 94 4161 66 130 69 77 81 85 93 25 102 54 109 98 49 46 97
我使用 peek() 将 QuittableTasks 传递给 ExecutorService ,然后将这些任务收集到 List.main() 中,只要任何任务仍在运行,就会阻止程序退出。即使为每个任务按顺序调用 quit() 方法,任务也不会按照它们创建的顺序关闭。独立运行的任务不会确定性地响应信号。
CompletableFuture 类
作为介绍,这里是使用 CompletableFutures 在 QuittingTasks.java 中:
// concurrent/QuittingCompletable.java
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import onjava.Nap;
public class QuittingCompletable {
public static void main(String[] args) {
List<QuittableTask> tasks =
IntStream.range(1, QuittingTasks.COUNT)
.mapToObj(QuittableTask::new)
.collect(Collectors.toList());
List<CompletableFuture<Void>> cfutures =
tasks.stream()
.map(CompletableFuture::runAsync)
.collect(Collectors.toList());
new Nap(1);
tasks.forEach(QuittableTask::quit);
cfutures.forEach(CompletableFuture::join);
}
}
输出结果:
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 2526 27 28 29 30 31 32 33 34 6 35 4 38 39 40 41 42 43 4445 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 6263 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 8081 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 9899 100 101 102 103 104 105 106 107 108 109 110 111 1121 113 114 116 117 118 119 120 121 122 123 124 125 126127 128 129 130 131 132 133 134 135 136 137 138 139 140141 142 143 144 145 146 147 148 149 5 115 37 36 2 3
任务是一个 List<QuittableTask>
,就像在 QuittingTasks.java
中一样,但是在这个例子中,没有 peek()
将每个 QuittableTask
提交给 ExecutorService
。相反,在创建 cfutures
期间,每个任务都交给 CompletableFuture::runAsync
。这执行 VerifyTask.run()
并返回 CompletableFuture<Void>
。因为 run()
不返回任何内容,所以在这种情况下我只使用 CompletableFuture
调用 join()
来等待它完成。
在本例中需要注意的重要一点是,运行任务不需要使用 ExecutorService
。而是直接交给 CompletableFuture
管理 (不过你可以向它提供自己定义的 ExectorService
)。您也不需要调用 shutdown()
;事实上,除非你像我在这里所做的那样显式地调用 join()
,否则程序将尽快退出,而不必等待任务完成。
这个例子只是一个起点。你很快就会看到 ComplempleFuture
能够做得更多。
基本用法
这是一个带有静态方法 work() 的类,它对该类的对象执行某些工作:
// concurrent/Machina.java
import onjava.Nap;
public class Machina {
public enum State {
START, ONE, TWO, THREE, END;
State step() {
if(equals(END))
return END;
return values()[ordinal() + 1];
}
}
private State state = State.START;
private final int id;
public Machina(int id) {
this.id = id;
}
public static Machina work(Machina m) {
if(!m.state.equals(State.END)){
new Nap(0.1);
m.state = m.state.step();
}
System.out.println(m);
return m;
}
@Override
public String toString() {
return"Machina" + id + ": " + (state.equals(State.END)? "complete" : state);
}
}
这是一个有限状态机,一个微不足道的机器,因为它没有分支......它只是从头到尾遍历一条路径。work() 方法将机器从一个状态移动到下一个状态,并且需要 100 毫秒才能完成“工作”。
CompletableFuture 可以被用来做的一件事是, 使用 completedFuture() 将它感兴趣的对象进行包装。
// concurrent/CompletedMachina.java
import java.util.concurrent.*;
public class CompletedMachina {
public static void main(String[] args) {
CompletableFuture<Machina> cf =
CompletableFuture.completedFuture(
new Machina(0));
try {
Machina m = cf.get(); // Doesn't block
} catch(InterruptedException |
ExecutionException e) {
throw new RuntimeException(e);
}
}
}
completedFuture() 创建一个“已经完成”的 CompletableFuture 。对这样一个未来做的唯一有用的事情是 get() 里面的对象,所以这看起来似乎没有用。注意 CompletableFuture 被输入到它包含的对象。这个很重要。
通常,get() 在等待结果时阻塞调用线程。此块可以通过 InterruptedException 或 ExecutionException 中断。在这种情况下,阻止永远不会发生,因为 CompletableFuture 已经完成,所以结果立即可用。
当我们将 handle() 包装在 CompletableFuture 中时,发现我们可以在 CompletableFuture 上添加操作来处理所包含的对象,使得事情变得更加有趣:
// concurrent/CompletableApply.java
import java.util.concurrent.*;
public class CompletableApply {
public static void main(String[] args) {
CompletableFuture<Machina> cf =
CompletableFuture.completedFuture(
new Machina(0));
CompletableFuture<Machina> cf2 =
cf.thenApply(Machina::work);
CompletableFuture<Machina> cf3 =
cf2.thenApply(Machina::work);
CompletableFuture<Machina> cf4 =
cf3.thenApply(Machina::work);
CompletableFuture<Machina> cf5 =
cf4.thenApply(Machina::work);
}
}
输出结果:
Machina0: ONE
Machina0: TWO
Machina0: THREE
Machina0: complete
thenApply()
应用一个接收输入并产生输出的函数。在本例中,work()
函数产生的类型与它所接收的类型相同 (Machina
),因此每个 CompletableFuture
添加的操作的返回类型都为 Machina
,但是 (类似于流中的 map()
) 函数也可以返回不同的类型,这将体现在返回类型上。
你可以在此处看到有关 CompletableFutures 的重要信息:它们会在你执行操作时自动解包并重新包装它们所携带的对象。这使得编写和理解代码变得更加简单, 而不会在陷入在麻烦的细节中。
我们可以消除中间变量并将操作链接在一起,就像我们使用 Streams 一样:
// concurrent/CompletableApplyChained.javaimport java.util.concurrent.*;
import onjava.Timer;
public class CompletableApplyChained {
public static void main(String[] args) {
Timer timer = new Timer();
CompletableFuture<Machina> cf =
CompletableFuture.completedFuture(
new Machina(0))
.thenApply(Machina::work)
.thenApply(Machina::work)
.thenApply(Machina::work)
.thenApply(Machina::work);
System.out.println(timer.duration());
}
}
输出结果:
Machina0: ONE
Machina0: TWO
Machina0: THREE
Machina0: complete
514
这里我们还添加了一个 Timer
,它的功能在每一步都显性地增加 100ms 等待时间之外,还将 CompletableFuture
内部 thenApply
带来的额外开销给体现出来了。
CompletableFutures 的一个重要好处是它们鼓励使用私有子类原则(不共享任何东西)。默认情况下,使用 thenApply() 来应用一个不对外通信的函数 - 它只需要一个参数并返回一个结果。这是函数式编程的基础,并且它在并发特性方面非常有效[1]。并行流和 ComplempleFutures
旨在支持这些原则。只要你不决定共享数据(共享非常容易导致意外发生)你就可以编写出相对安全的并发程序。
回调 thenApply()
一旦开始一个操作,在完成所有任务之前,不会完成 CompletableFuture 的构建。虽然这有时很有用,但是开始所有任务通常更有价值,这样就可以运行继续前进并执行其他操作。我们可通过thenApplyAsync()
来实现此目的:
// concurrent/CompletableApplyAsync.java
import java.util.concurrent.*;
import onjava.*;
public class CompletableApplyAsync {
public static void main(String[] args) {
Timer timer = new Timer();
CompletableFuture<Machina> cf =
CompletableFuture.completedFuture(
new Machina(0))
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work);
System.out.println(timer.duration());
System.out.println(cf.join());
System.out.println(timer.duration());
}
}
输出结果:
116
Machina0: ONE
Machina0: TWO
Machina0:THREE
Machina0: complete
Machina0: complete
552
同步调用 (我们通常使用的那种) 意味着:“当你完成工作时,才返回”,而异步调用以意味着: “立刻返回并继续后续工作”。 正如你所看到的,cf
的创建现在发生的更快。每次调用 thenApplyAsync()
都会立刻返回,因此可以进行下一次调用,整个调用链路完成速度比以前快得多。
事实上,如果没有回调 cf.join()
方法,程序会在完成其工作之前退出。而 cf.join()
直到 cf 操作完成之前,阻止 main()
进程结束。我们还可以看出本示例大部分时间消耗在 cf.join()
这。
这种“立即返回”的异步能力需要 CompletableFuture
库进行一些秘密(client
无感)工作。特别是,它将你需要的操作链存储为一组回调。当操作的第一个链路(后台操作)完成并返回时,第二个链路(后台操作)必须获取生成的 Machina
并开始工作,以此类推! 但这种异步机制没有我们可以通过程序调用栈控制的普通函数调用序列,它的调用链路顺序会丢失,因此它使用一个函数地址来存储的回调来解决这个问题。
幸运的是,这就是你需要了解的有关回调的全部信息。程序员将这种人为制造的混乱称为 callback hell(回调地狱)。通过异步调用,CompletableFuture
帮你管理所有回调。 除非你知道你系统中的一些特定逻辑会导致某些改变,或许你更想使用异步调用来实现程序。
- 其他操作
当你查看CompletableFuture
的 Javadoc
时,你会看到它有很多方法,但这个方法的大部分来自不同操作的变体。例如,有 thenApply()
,thenApplyAsync()
和第二种形式的 thenApplyAsync()
,它们使用 Executor
来运行任务 (在本书中,我们忽略了 Executor
选项)。
下面的示例展示了所有"基本"操作,这些操作既不涉及组合两个 CompletableFuture
,也不涉及异常 (我们将在后面介绍)。首先,为了提供简洁性和方便性,我们应该重用以下两个实用程序:
package onjava;
import java.util.concurrent.*;
public class CompletableUtilities {
// Get and show value stored in a CF:
public static void showr(CompletableFuture<?> c) {
try {
System.out.println(c.get());
} catch(InterruptedException
| ExecutionException e) {
throw new RuntimeException(e);
}
}
// For CF operations that have no value:
public static void voidr(CompletableFuture<Void> c) {
try {
c.get(); // Returns void
} catch(InterruptedException
| ExecutionException e) {
throw new RuntimeException(e);
}
}
}
showr()
在 CompletableFuture<Integer>
上调用 get()
,并显示结果,try/catch
两个可能会出现的异常。
voidr()
是 CompletableFuture<Void>
的 showr()
版本,也就是说,CompletableFutures
只为任务完成或失败时显示信息。
为简单起见,下面的 CompletableFutures
只包装整数。cfi()
是一个便利的方法,它把一个整数包装在一个完整的 CompletableFuture<Integer>
:
// concurrent/CompletableOperations.java
import java.util.concurrent.*;
import static onjava.CompletableUtilities.*;
public class CompletableOperations {
static CompletableFuture<Integer> cfi(int i) {
return
CompletableFuture.completedFuture(
Integer.valueOf(i));
}
public static void main(String[] args) {
showr(cfi(1)); // Basic test
voidr(cfi(2).runAsync(() ->
System.out.println("runAsync")));
voidr(cfi(3).thenRunAsync(() ->
System.out.println("thenRunAsync")));
voidr(CompletableFuture.runAsync(() ->
System.out.println("runAsync is static")));
showr(CompletableFuture.supplyAsync(() -> 99));
voidr(cfi(4).thenAcceptAsync(i ->
System.out.println("thenAcceptAsync: " + i)));
showr(cfi(5).thenApplyAsync(i -> i + 42));
showr(cfi(6).thenComposeAsync(i -> cfi(i + 99)));
CompletableFuture<Integer> c = cfi(7);
c.obtrudeValue(111);
showr(c);
showr(cfi(8).toCompletableFuture());
c = new CompletableFuture<>();
c.complete(9);
showr(c);
c = new CompletableFuture<>();
c.cancel(true);
System.out.println("cancelled: " +
c.isCancelled());
System.out.println("completed exceptionally: " +
c.isCompletedExceptionally());
System.out.println("done: " + c.isDone());
System.out.println(c);
c = new CompletableFuture<>();
System.out.println(c.getNow(777));
c = new CompletableFuture<>();
c.thenApplyAsync(i -> i + 42)
.thenApplyAsync(i -> i * 12);
System.out.println("dependents: " +
c.getNumberOfDependents());
c.thenApplyAsync(i -> i / 2);
System.out.println("dependents: " +
c.getNumberOfDependents());
}
}
输出结果 :
1
runAsync
thenRunAsync
runAsync is static
99
thenAcceptAsync: 4
47
105
111
8
9
cancelled: true
completed exceptionally: true
done: true
java.util.concurrent.CompletableFuture@6d311334[Complet ed exceptionally]
777
dependents: 1
dependents: 2
-
main()
包含一系列可由其int
值引用的测试。-
cfi(1)
演示了showr()
正常工作。 -
cfi(2)
是调用runAsync()
的示例。由于Runnable
不产生返回值,因此使用了返回CompletableFuture <Void>
的voidr()
方法。 - 注意使用
cfi(3)
,thenRunAsync()
效果似乎与 上例cfi(2)
使用的runAsync()
相同,差异在后续的测试中体现:-
runAsync()
是一个static
方法,所以你通常不会像cfi(2)
一样调用它。相反你可以在QuittingCompletable.java
中使用它。 - 后续测试中表明
supplyAsync()
也是静态方法,区别在于它需要一个Supplier
而不是Runnable
, 并产生一个CompletableFuture<Integer>
而不是CompletableFuture<Void>
。
-
-
then
系列方法将对现有的CompletableFuture<Integer>
进一步操作。- 与
thenRunAsync()
不同,cfi(4)
,cfi(5)
和cfi(6)
"then" 方法的参数是未包装的Integer
。 - 通过使用
voidr()
方法可以看到:-
AcceptAsync()
接收了一个Consumer
,因此不会产生结果。 -
thenApplyAsync()
接收一个Function
, 并生成一个结果(该结果的类型可以不同于其输入类型)。 -
thenComposeAsync()
与thenApplyAsync()
非常相似,唯一区别在于其Function
必须产生已经包装在CompletableFuture
中的结果。
-
- 与
-
cfi(7)
示例演示了obtrudeValue()
,它强制将值作为结果。 -
cfi(8)
使用toCompletableFuture()
从CompletionStage
生成一个CompletableFuture
。 -
c.complete(9)
显示了如何通过给它一个结果来完成一个task
(future
)(与obtrudeValue()
相对,后者可能会迫使其结果替换该结果)。 - 如果你调用
CompletableFuture
中的cancel()
方法,如果已经完成此任务,则正常结束。 如果尚未完成,则使用CancellationException
完成此CompletableFuture
。 - 如果任务(
future
)完成,则 getNow() 方法返回CompletableFuture
的完成值,否则返回getNow()
的替换参数。 - 最后,我们看一下依赖 (
dependents
) 的概念。如果我们将两个thenApplyAsync()
调用链路到CompletableFuture
上,则依赖项的数量不会增加,保持为 1。但是,如果我们另外将另一个thenApplyAsync()
直接附加到c
,则现在有两个依赖项:两个一起的链路和另一个单独附加的链路。- 这表明你可以使用一个
CompletionStage
,当它完成时,可以根据其结果派生多个新任务。
- 这表明你可以使用一个
-
结合 CompletableFuture
第二种类型的 CompletableFuture
方法采用两种 CompletableFuture
并以各异方式将它们组合在一起。就像两个人在比赛一样, 一个CompletableFuture
通常比另一个更早地到达终点。这些方法允许你以不同的方式处理结果。
为了测试这一点,我们将创建一个任务,它有一个我们可以控制的定义了完成任务所需要的时间量的参数。
CompletableFuture 先完成:�
// concurrent/Workable.java
import java.util.concurrent.*;
import onjava.Nap;
public class Workable {
String id;
final double duration;
public Workable(String id, double duration) {
this.id = id;
this.duration = duration;
}
@Override
public String toString() {
return "Workable[" + id + "]";
}
public static Workable work(Workable tt) {
new Nap(tt.duration); // Seconds
tt.id = tt.id + "W";
System.out.println(tt);
return tt;
}
public static CompletableFuture<Workable> make(String id, double duration) {
return CompletableFuture
.completedFuture(
new Workable(id, duration)
)
.thenApplyAsync(Workable::work);
}
}
在 make()
中,work()
方法应用于CompletableFuture
。work()
需要一定的时间才能完成,然后它将字母 W 附加到 id 上,表示工作已经完成。
现在我们可以创建多个竞争的 CompletableFuture
,并使用 CompletableFuture
库中的各种方法来进行操作:
// concurrent/DualCompletableOperations.java
import java.util.concurrent.*;
import static onjava.CompletableUtilities.*;
public class DualCompletableOperations {
static CompletableFuture<Workable> cfA, cfB;
static void init() {
cfA = Workable.make("A", 0.15);
cfB = Workable.make("B", 0.10); // Always wins
}
static void join() {
cfA.join();
cfB.join();
System.out.println("*****************");
}
public static void main(String[] args) {
init();
voidr(cfA.runAfterEitherAsync(cfB, () ->
System.out.println("runAfterEither")));
join();
init();
voidr(cfA.runAfterBothAsync(cfB, () ->
System.out.println("runAfterBoth")));
join();
init();
showr(cfA.applyToEitherAsync(cfB, w -> {
System.out.println("applyToEither: " + w);
return w;
}));
join();
init();
voidr(cfA.acceptEitherAsync(cfB, w -> {
System.out.println("acceptEither: " + w);
}));
join();
init();
voidr(cfA.thenAcceptBothAsync(cfB, (w1, w2) -> {
System.out.println("thenAcceptBoth: "
+ w1 + ", " + w2);
}));
join();
init();
showr(cfA.thenCombineAsync(cfB, (w1, w2) -> {
System.out.println("thenCombine: "
+ w1 + ", " + w2);
return w1;
}));
join();
init();
CompletableFuture<Workable>
cfC = Workable.make("C", 0.08),
cfD = Workable.make("D", 0.09);
CompletableFuture.anyOf(cfA, cfB, cfC, cfD)
.thenRunAsync(() ->
System.out.println("anyOf"));
join();
init();
cfC = Workable.make("C", 0.08);
cfD = Workable.make("D", 0.09);
CompletableFuture.allOf(cfA, cfB, cfC, cfD)
.thenRunAsync(() ->
System.out.println("allOf"));
join();
}
}
输出结果:
Workable[BW]
runAfterEither
Workable[AW]
*****************
Workable[BW]
Workable[AW]
runAfterBoth
*****************
Workable[BW]
applyToEither: Workable[BW]
Workable[BW]
Workable[AW]
*****************
Workable[BW]
acceptEither: Workable[BW]
Workable[AW]
*****************
Workable[BW]
Workable[AW]
thenAcceptBoth: Workable[AW], Workable[BW]
****************
Workable[BW]
Workable[AW]
thenCombine: Workable[AW], Workable[BW]
Workable[AW]
*****************
Workable[CW]
anyOf
Workable[DW]
Workable[BW]
Workable[AW]
*****************
Workable[CW]
Workable[DW]
Workable[BW]
Workable[AW]
*****************
allOf
- 为了方便访问, 将
cfA
和cfB
定义为static
的。-
init()
方法用于A
,B
初始化这两个变量,因为B
总是给出比A
较短的延迟,所以总是win
的一方。 -
join()
是在两个方法上调用join()
并显示边框的另一个便利方法。
-
- 所有这些 “
dual
” 方法都以一个CompletableFuture
作为调用该方法的对象,第二个CompletableFuture
作为第一个参数,然后是要执行的操作。 - 通过使用
showr()
和voidr()
可以看到,“run
”和“accept
”是终端操作,而“apply
”和“combine
”则生成新的payload-bearing
(承载负载) 的CompletableFuture
。 - 方法的名称不言自明,你可以通过查看输出来验证这一点。一个特别有趣的方法是
combineAsync()
,它等待两个CompletableFuture
完成,然后将它们都交给一个BiFunction
,这个BiFunction
可以将结果加入到最终的CompletableFuture
的有效负载中。
模拟
作为使用 CompletableFuture
将一系列操作组合的示例,让我们模拟一下制作蛋糕的过程。在第一阶段,我们准备并将原料混合成面糊:
// concurrent/Batter.java
import java.util.concurrent.*;
import onjava.Nap;
public class Batter {
static class Eggs {
}
static class Milk {
}
static class Sugar {
}
static class Flour {
}
static <T> T prepare(T ingredient) {
new Nap(0.1);
return ingredient;
}
static <T> CompletableFuture<T> prep(T ingredient) {
return CompletableFuture
.completedFuture(ingredient)
.thenApplyAsync(Batter::prepare);
}
public static CompletableFuture<Batter> mix() {
CompletableFuture<Eggs> eggs = prep(new Eggs());
CompletableFuture<Milk> milk = prep(new Milk());
CompletableFuture<Sugar> sugar = prep(new Sugar());
CompletableFuture<Flour> flour = prep(new Flour());
CompletableFuture
.allOf(eggs, milk, sugar, flour)
.join();
new Nap(0.1); // Mixing time
return CompletableFuture.completedFuture(new Batter());
}
}
每种原料都需要一些时间来准备。allOf()
等待所有的配料都准备好,然后使用更多些的时间将其混合成面糊。接下来,我们把单批面糊放入四个平底锅中烘烤。产品作为 CompletableFutures
流返回:
// concurrent/Baked.java
import java.util.concurrent.*;
import java.util.stream.*;
import onjava.Nap;
public class Baked {
static class Pan {
}
static Pan pan(Batter b) {
new Nap(0.1);
return new Pan();
}
static Baked heat(Pan p) {
new Nap(0.1);
return new Baked();
}
static CompletableFuture<Baked> bake(CompletableFuture<Batter> cfb) {
return cfb
.thenApplyAsync(Baked::pan)
.thenApplyAsync(Baked::heat);
}
public static Stream<CompletableFuture<Baked>> batch() {
CompletableFuture<Batter> batter = Batter.mix();
return Stream.of(
bake(batter),
bake(batter),
bake(batter),
bake(batter)
);
}
}
最后,我们制作了一批糖,并用它对蛋糕进行糖化:
// concurrent/FrostedCake.java
import java.util.concurrent.*;
import java.util.stream.*;
import onjava.Nap;
final class Frosting {
private Frosting() {
}
static CompletableFuture<Frosting> make() {
new Nap(0.1);
return CompletableFuture
.completedFuture(new Frosting());
}
}
public class FrostedCake {
public FrostedCake(Baked baked, Frosting frosting) {
new Nap(0.1);
}
@Override
public String toString() {
return "FrostedCake";
}
public static void main(String[] args) {
Baked.batch().forEach(
baked -> baked
.thenCombineAsync(Frosting.make(),
(cake, frosting) ->
new FrostedCake(cake, frosting))
.thenAcceptAsync(System.out::println)
.join());
}
}
一旦你习惯了这种背后的想法, CompletableFuture
它们相对易于使用。
异常
与 CompletableFuture
在处理链中包装对象的方式相同,它也会缓冲异常。这些在处理时调用者是无感的,但仅当你尝试提取结果时才会被告知。为了说明它们是如何工作的,我们首先创建一个类,它在特定的条件下抛出一个异常:
// concurrent/Breakable.java
import java.util.concurrent.*;
public class Breakable {
String id;
private int failcount;
public Breakable(String id, int failcount) {
this.id = id;
this.failcount = failcount;
}
@Override
public String toString() {
return "Breakable_" + id + " [" + failcount + "]";
}
public static Breakable work(Breakable b) {
if (--b.failcount == 0) {
System.out.println(
"Throwing Exception for " + b.id + ""
);
throw new RuntimeException(
"Breakable_" + b.id + " failed"
);
}
System.out.println(b);
return b;
}
}
当failcount
> 0,且每次将对象传递给 work()
方法时, failcount - 1
。当failcount - 1 = 0
时,work()
将抛出一个异常。如果传给 work()
的 failcount = 0
,work()
永远不会抛出异常。
注意,异常信息此示例中被抛出( RuntimeException
)
在下面示例 test()
方法中,work()
多次应用于 Breakable
,因此如果 failcount
在范围内,就会抛出异常。然而,在测试A
到E
中,你可以从输出中看到抛出了异常,但它们从未出现:
// concurrent/CompletableExceptions.java
import java.util.concurrent.*;
public class CompletableExceptions {
static CompletableFuture<Breakable> test(String id, int failcount) {
return CompletableFuture.completedFuture(
new Breakable(id, failcount))
.thenApply(Breakable::work)
.thenApply(Breakable::work)
.thenApply(Breakable::work)
.thenApply(Breakable::work);
}
public static void main(String[] args) {
// Exceptions don't appear ...
test("A", 1);
test("B", 2);
test("C", 3);
test("D", 4);
test("E", 5);
// ... until you try to fetch the value:
try {
test("F", 2).get(); // or join()
} catch (Exception e) {
System.out.println(e.getMessage());
}
// Test for exceptions:
System.out.println(
test("G", 2).isCompletedExceptionally()
);
// Counts as "done":
System.out.println(test("H", 2).isDone());
// Force an exception:
CompletableFuture<Integer> cfi =
new CompletableFuture<>();
System.out.println("done? " + cfi.isDone());
cfi.completeExceptionally(
new RuntimeException("forced"));
try {
cfi.get();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
输出结果:
Throwing Exception for A
Breakable_B [1]
Throwing Exception for B
Breakable_C [2]
Breakable_C [1]
Throwing Exception for C
Breakable_D [3]
Breakable_D [2]
Breakable_D [1]
Throwing Exception for D
Breakable_E [4]
Breakable_E [3]
Breakable_E [2]
Breakable_E [1]
Breakable_F [1]
Throwing Exception for F
java.lang.RuntimeException: Breakable_F failed
Breakable_G [1]
Throwing Exception for G
true
Breakable_H [1]
Throwing Exception for H
true
done? false
java.lang.RuntimeException: forced
测试 A
到 E
运行到抛出异常,然后…并没有将抛出的异常暴露给调用方。只有在测试 F 中调用 get()
时,我们才会看到抛出的异常。
测试 G
表明,你可以首先检查在处理期间是否抛出异常,而不抛出该异常。然而,test H
告诉我们,不管异常是否成功,它仍然被视为已“完成”。
代码的最后一部分展示了如何将异常插入到 CompletableFuture
中,而不管是否存在任何失败。
在连接或获取结果时,我们使用 CompletableFuture
提供的更复杂的机制来自动响应异常,而不是使用粗糙的 try-catch
。
你可以使用与我们看到的所有 CompletableFuture
相同的表单来完成此操作:在链中插入一个 CompletableFuture
调用。有三个选项 exceptionally()
,handle()
, whenComplete()
:
// concurrent/CatchCompletableExceptions.java
import java.util.concurrent.*;
public class CatchCompletableExceptions {
static void handleException(int failcount) {
// Call the Function only if there's an
// exception, must produce same type as came in:
CompletableExceptions
.test("exceptionally", failcount)
.exceptionally((ex) -> { // Function
if (ex == null)
System.out.println("I don't get it yet");
return new Breakable(ex.getMessage(), 0);
})
.thenAccept(str ->
System.out.println("result: " + str));
// Create a new result (recover):
CompletableExceptions
.test("handle", failcount)
.handle((result, fail) -> { // BiFunction
if (fail != null)
return "Failure recovery object";
else
return result + " is good";
})
.thenAccept(str ->
System.out.println("result: " + str));
// Do something but pass the same result through:
CompletableExceptions
.test("whenComplete", failcount)
.whenComplete((result, fail) -> { // BiConsumer
if (fail != null)
System.out.println("It failed");
else
System.out.println(result + " OK");
})
.thenAccept(r ->
System.out.println("result: " + r));
}
public static void main(String[] args) {
System.out.println("**** Failure Mode ****");
handleException(2);
System.out.println("**** Success Mode ****");
handleException(0);
}
}
输出结果:
**** Failure Mode ****
Breakable_exceptionally [1]
Throwing Exception for exceptionally
result: Breakable_java.lang.RuntimeException:
Breakable_exceptionally failed [0]
Breakable_handle [1]
Throwing Exception for handle
result: Failure recovery object
Breakable_whenComplete [1]
Throwing Exception for whenComplete
It failed
**** Success Mode ****
Breakable_exceptionally [-1]
Breakable_exceptionally [-2]
Breakable_exceptionally [-3]
Breakable_exceptionally [-4]
result: Breakable_exceptionally [-4]
Breakable_handle [-1]
Breakable_handle [-2]
Breakable_handle [-3]
Breakable_handle [-4]
result: Breakable_handle [-4] is good
Breakable_whenComplete [-1]
Breakable_whenComplete [-2]
Breakable_whenComplete [-3]
Breakable_whenComplete [-4]
Breakable_whenComplete [-4] OK
result: Breakable_whenComplete [-4]
exceptionally()
参数仅在出现异常时才运行。exceptionally()
局限性在于,该函数只能返回输入类型相同的值。exceptionally()
通过将一个好的对象插入到流中来恢复到一个可行的状态。-
handle()
一致被调用来查看是否发生异常(必须检查 fail 是否为 true)。但是
handle()
可以生成任何新类型,所以它允许执行处理,而不是像使用exceptionally()
那样简单地恢复。whenComplete()
类似于 handle(),同样必须测试它是否失败,但是参数是一个消费者,并且不修改传递给它的结果对象。
流异常(Stream Exception)
通过修改 CompletableExceptions.java ,看看 CompletableFuture 异常与流异常有何不同:
// concurrent/StreamExceptions.java
import java.util.concurrent.*;
import java.util.stream.*;
public class StreamExceptions {
static Stream<Breakable> test(String id, int failcount) {
return Stream.of(new Breakable(id, failcount))
.map(Breakable::work)
.map(Breakable::work)
.map(Breakable::work)
.map(Breakable::work);
}
public static void main(String[] args) {
// No operations are even applied ...
test("A", 1);
test("B", 2);
Stream<Breakable> c = test("C", 3);
test("D", 4);
test("E", 5);
// ... until there's a terminal operation:
System.out.println("Entering try");
try {
c.forEach(System.out::println); // [1]
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
输出结果:
Entering try
Breakable_C [2]
Breakable_C [1]
Throwing Exception for C
Breakable_C failed
使用 CompletableFuture
,我们可以看到测试 A 到 E 的进展,但是使用流,在你应用一个终端操作之前(e.g. forEach()
),什么都不会暴露给 Client
CompletableFuture
执行工作并捕获任何异常供以后检索。比较这两者并不容易,因为 Stream
在没有终端操作的情况下根本不做任何事情——但是流绝对不会存储它的异常。
检查性异常
CompletableFuture
和 parallel Stream
都不支持包含检查性异常的操作。相反,你必须在调用操作时处理检查到的异常,这会产生不太优雅的代码:
// concurrent/ThrowsChecked.java
import java.util.stream.*;
import java.util.concurrent.*;
public class ThrowsChecked {
class Checked extends Exception {}
static ThrowsChecked nochecked(ThrowsChecked tc) {
return tc;
}
static ThrowsChecked withchecked(ThrowsChecked tc) throws Checked {
return tc;
}
static void testStream() {
Stream.of(new ThrowsChecked())
.map(ThrowsChecked::nochecked)
// .map(ThrowsChecked::withchecked); // [1]
.map(
tc -> {
try {
return withchecked(tc);
} catch (Checked e) {
throw new RuntimeException(e);
}
});
}
static void testCompletableFuture() {
CompletableFuture
.completedFuture(new ThrowsChecked())
.thenApply(ThrowsChecked::nochecked)
// .thenApply(ThrowsChecked::withchecked); // [2]
.thenApply(
tc -> {
try {
return withchecked(tc);
} catch (Checked e) {
throw new RuntimeException(e);
}
});
}
}
如果你试图像使用 nochecked()
那样使用withchecked()
的方法引用,编译器会在 [1]
和 [2]
中报错。相反,你必须写出 lambda 表达式 (或者编写一个不会抛出异常的包装器方法)。
死锁
由于任务可以被阻塞,因此一个任务有可能卡在等待另一个任务上,而后者又在等待别的任务,这样一直下去,知道这个链条上的任务又在等待第一个任务释放锁。这得到了一个任务之间相互等待的连续循环, 没有哪个线程能继续, 这称之为死锁[2]
如果你运行一个程序,而它马上就死锁了, 你可以立即跟踪下去。真正的问题在于,程序看起来工作良好, 但是具有潜在的死锁危险。这时, 死锁可能发生,而事先却没有任何征兆, 所以 bug
会潜伏在你的程序例,直到客户发现它出乎意料的发生(以一种几乎肯定是很难重现的方式发生)。因此在编写并发程序的时候,进行仔细的程序设计以防止死锁是关键部分。
埃德斯·迪克斯特拉(Essger Dijkstra
)发明的“哲学家进餐"问题是经典的死锁例证。基本描述指定了五位哲学家(此处显示的示例允许任何数目)。这些哲学家将花部分时间思考,花部分时间就餐。他们在思考的时候并不需要任何共享资源;但是他们使用的餐具数量有限。在最初的问题描述中,餐具是叉子,需要两个叉子才能从桌子中间的碗里取出意大利面。常见的版本是使用筷子, 显然,每个哲学家都需要两根筷子才能吃饭。
引入了一个困难:作为哲学家,他们的钱很少,所以他们只能买五根筷子(更一般地讲,筷子的数量与哲学家相同)。他们围在桌子周围,每人之间放一根筷子。 当一个哲学家要就餐时,该哲学家必须同时持有左边和右边的筷子。如果任一侧的哲学家都在使用所需的筷子,则我们的哲学家必须等待,直到可得到必须的筷子。
StickHolder 类通过将单根筷子保持在大小为 1 的 BlockingQueue 中来管理它。BlockingQueue 是一个设计用于在并发程序中安全使用的集合,如果你调用 take() 并且队列为空,则它将阻塞(等待)。将新元素放入队列后,将释放该块并返回该值:
// concurrent/StickHolder.java
import java.util.concurrent.*;
public class StickHolder {
private static class Chopstick {
}
private Chopstick stick = new Chopstick();
private BlockingQueue<Chopstick> holder =
new ArrayBlockingQueue<>(1);
public StickHolder() {
putDown();
}
public void pickUp() {
try {
holder.take(); // Blocks if unavailable
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void putDown() {
try {
holder.put(stick);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
为简单起见,Chopstick
(static
) 实际上不是由 StickHolder
生产的,而是在其类中保持私有的。
如果您调用了pickUp()
,而 stick
不可用,那么pickUp()
将阻塞该 stick
,直到另一个哲学家调用putDown()
将 stick
返回。
注意,该类中的所有线程安全都是通过 BlockingQueue
实现的。
每个哲学家都是一项任务,他们试图把筷子分别 pickUp()
在左手和右手上,这样筷子才能吃东西,然后通过 putDown()
放下 stick
。
// concurrent/Philosopher.java
public class Philosopher implements Runnable {
private final int seat;
private final StickHolder left, right;
public Philosopher(int seat, StickHolder left, StickHolder right) {
this.seat = seat;
this.left = left;
this.right = right;
}
@Override
public String toString() {
return "P" + seat;
}
@Override
public void run() {
while (true) {
// System.out.println("Thinking"); // [1]
right.pickUp();
left.pickUp();
System.out.println(this + " eating");
right.putDown();
left.putDown();
}
}
}
没有两个哲学家可以同时成功调用 take() 同一只筷子。另外,如果一个哲学家已经拿过筷子,那么下一个试图拿起同一根筷子的哲学家将阻塞,等待其被释放。
结果是一个看似无辜的程序陷入了死锁。我在这里使用数组而不是集合,只是因为这种语法更简洁:
// concurrent/DiningPhilosophers.java
// Hidden deadlock
// {ExcludeFromGradle} Gradle has trouble
import java.util.*;
import java.util.concurrent.*;
import onjava.Nap;
public class DiningPhilosophers {
private StickHolder[] sticks;
private Philosopher[] philosophers;
public DiningPhilosophers(int n) {
sticks = new StickHolder[n];
Arrays.setAll(sticks, i -> new StickHolder());
philosophers = new Philosopher[n];
Arrays.setAll(philosophers, i ->
new Philosopher(i,
sticks[i], sticks[(i + 1) % n])); // [1]
// Fix by reversing stick order for this one:
// philosophers[1] = // [2]
// new Philosopher(0, sticks[0], sticks[1]);
Arrays.stream(philosophers)
.forEach(CompletableFuture::runAsync); // [3]
}
public static void main(String[] args) {
// Returns right away:
new DiningPhilosophers(5); // [4]
// Keeps main() from exiting:
new Nap(3, "Shutdown");
}
}
- 当你停止查看输出时,该程序将死锁。但是,根据你的计算机配置,你可能不会看到死锁。看来这取决于计算机上的内核数[3]。两个核心不会产生死锁,但两核以上却很容易产生死锁。
- 此行为使该示例更好地说明了死锁,因为你可能正在具有 2 核的计算机上编写程序(如果确实是导致问题的原因),并且确信该程序可以正常工作,只能启动它将其安装在另一台计算机上时出现死锁。请注意,不能因为你没或不容易看到死锁,这并不意味着此程序不会在 2 核机器上发生死锁。 该程序仍然有死锁倾向,只是很少发生——可以说是最糟糕的情况,因为问题不容易出现。
- 在
DiningPhilosophers
的构造方法中,每个哲学家都获得一个左右筷子的引用。除最后一个哲学家外,都是通过把哲学家放在下一双空闲筷子之间来初始化:- 最后一位哲学家得到了第 0 根筷子作为他的右筷子,所以圆桌就完成。
- 那是因为最后一位哲学家正坐在第一个哲学家的旁边,而且他们俩都共用零筷子。[1] 显示了以 n 为模数选择的右筷子,将最后一个哲学家绕到第一个哲学家的旁边。
- 现在,所有哲学家都可以尝试吃饭,每个哲学家都在旁边等待哲学家放下筷子。
- 为了让每个哲学家在[3] 上运行,调用
runAsync()
,这意味着 DiningPhilosophers 的构造函数立即返回到[4]。 - 如果没有任何东西阻止
main()
完成,程序就会退出,不会做太多事情。 -
Nap
对象阻止main()
退出,然后在三秒后强制退出 (假设/可能是) 死锁程序。 - 在给定的配置中,哲学家几乎不花时间思考。因此,他们在吃东西的时候都争着用筷子,而且往往很快就会陷入僵局。你可以改变这个:
- 为了让每个哲学家在[3] 上运行,调用
通过增加[4] 的值来添加更多哲学家。
在 Philosopher.java 中取消注释行[1]。
任一种方法都会减少死锁的可能性,这表明编写并发程序并认为它是安全的危险,因为它似乎“在我的机器上运行正常”。你可以轻松地说服自己该程序没有死锁,即使它不是。这个示例相当有趣,因为它演示了看起来可以正确运行,但实际上会可能发生死锁的程序。
要修正死锁问题,你必须明白,当以下四个条件同时满足时,就会发生死锁:
- 互斥条件。任务使用的资源中至少有一个不能共享的。 这里,一根筷子一次就只能被一个哲学家使用。
- 至少有一个任务它必须持有一个资源且正在等待获取一个被当前别的任务持有的资源。也就是说,要发生死锁,哲学家必须拿着一根筷子并且等待另一根。
- 资源不能被任务抢占, 任务必须把资源释放当作普通事件。哲学家很有礼貌,他们不会从其它哲学家那里抢筷子。
- 必须有循环等待, 这时,一个任务等待其它任务所持有的资源, 后者又在等待另一个任务所持有的资源, 这样一直下去,知道有一个任务在等待第一个任务所持有的资源, 使得大家都被锁住。 在
DiningPhilosophers.java
中, 因为每个哲学家都试图先得到右边的 筷子, 然后得到左边的 筷子, 所以发生了循环等待。
因为必须满足所有条件才能导致死锁,所以要阻止死锁的话,只需要破坏其中一个即可。在此程序中,防止死锁的一种简单方法是打破第四个条件。之所以会发生这种情况,是因为每个哲学家都尝试按照特定的顺序拾起自己的筷子:先右后左。因此,每个哲学家都有可能在等待左手的同时握住右手的筷子,从而导致循环等待状态。但是,如果其中一位哲学家尝试首先拿起左筷子,则该哲学家决不会阻止紧邻右方的哲学家拿起筷子,从而排除了循环等待。
在 DiningPhilosophers.java 中,取消注释[1] 和其后的一行。这将原来的哲学家[1] 替换为筷子颠倒的哲学家。通过确保第二位哲学家拾起并在右手之前放下左筷子,我们消除了死锁的可能性。
这只是解决问题的一种方法。你也可以通过防止其他情况之一来解决它。
没有语言支持可以帮助防止死锁;你有责任通过精心设计来避免这种情况。对于试图调试死锁程序的人来说,这些都不是安慰。当然,避免并发问题的最简单,最好的方法是永远不要共享资源-不幸的是,这并不总是可能的。
构造方法非线程安全
当你在脑子里想象一个对象构造的过程,你会很容易认为这个过程是线程安全的。毕竟,在对象初始化完成前对外不可见,所以又怎会对此产生争议呢?确实,Java 语言规范 (JLS) 自信满满地陈述道:“没必要使构造器的线程同步,因为它会锁定正在构造的对象,直到构造器完成初始化后才对其他线程可见。”
不幸的是,对象的构造过程如其他操作一样,也会受到共享内存并发问题的影响,只是作用机制可能更微妙罢了。
设想下使用一个 static 字段为每个对象自动创建唯一标识符的过程。为了测试其不同的实现过程,我们从一个接口开始。代码示例:
//concurrent/HasID.java
public interface HasID {
int getID();
}
然后 StaticIDField 类显式地实现该接口。代码示例:
// concurrent/StaticIDField.java
public class StaticIDField implements HasID {
private static int counter = 0;
private int id = counter++;
public int getID() { return id; }
}
如你所想,该类是个简单无害的类,它甚至都没一个显式的构造器来引发问题。当我们运行多个用于创建此类对象的线程时,究竟会发生什么?为了搞清楚这点,我们做了以下测试。代码示例:
// concurrent/IDChecker.java
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import java.util.concurrent.*;
import com.google.common.collect.Sets;
public class IDChecker {
public static final int SIZE = 100_000;
static class MakeObjects implements
Supplier<List<Integer>> {
private Supplier<HasID> gen;
MakeObjects(Supplier<HasID> gen) {
this.gen = gen;
}
@Override public List<Integer> get() {
return Stream.generate(gen)
.limit(SIZE)
.map(HasID::getID)
.collect(Collectors.toList());
}
}
public static void test(Supplier<HasID> gen) {
CompletableFuture<List<Integer>>
groupA = CompletableFuture.supplyAsync(new
MakeObjects(gen)),
groupB = CompletableFuture.supplyAsync(new
MakeObjects(gen));
groupA.thenAcceptBoth(groupB, (a, b) -> {
System.out.println(
Sets.intersection(
Sets.newHashSet(a),
Sets.newHashSet(b)).size());
}).join();
}
}
MakeObjects 类是一个生产者类,包含一个能够产生 List<Integer> 类型的列表对象的 get()
方法。通过从每个 HasID
对象提取 ID
并放入列表中来生成这个列表对象,而 test()
方法则创建了两个并行的 CompletableFuture 对象,用于运行 MakeObjects 生产者类,然后获取运行结果。
使用 Guava 库中的 **Sets.intersection()
方法,计算出这两个返回的 List<Integer> 对象中有多少相同的 ID
(使用谷歌 Guava 库里的方法比使用官方的 retainAll()
方法速度快得多)。
现在我们可以测试上面的 StaticIDField 类了。代码示例:
// concurrent/TestStaticIDField.java
public class TestStaticIDField {
public static void main(String[] args) {
IDChecker.test(StaticIDField::new);
}
}
输出结果:
13287
结果中出现了很多重复项。很显然,纯静态 int
用于构造过程并不是线程安全的。让我们使用 AtomicInteger 来使其变为线程安全的。代码示例:
// concurrent/GuardedIDField.java
import java.util.concurrent.atomic.*;
public class GuardedIDField implements HasID {
private static AtomicInteger counter = new
AtomicInteger();
private int id = counter.getAndIncrement();
public int getID() { return id; }
public static void main(String[] args) { IDChecker.test(GuardedIDField::new);
}
}
输出结果:
0
构造器有一种更微妙的状态共享方式:通过构造器参数:
// concurrent/SharedConstructorArgument.java
import java.util.concurrent.atomic.*;
interface SharedArg{
int get();
}
class Unsafe implements SharedArg{
private int i = 0;
public int get(){
return i++;
}
}
class Safe implements SharedArg{
private static AtomicInteger counter = new AtomicInteger();
public int get(){
return counter.getAndIncrement();
}
}
class SharedUser implements HasID{
private final int id;
SharedUser(SharedArg sa){
id = sa.get();
}
@Override
public int getID(){
return id;
}
}
public class SharedConstructorArgument{
public static void main(String[] args){
Unsafe unsafe = new Unsafe();
IDChecker.test(() -> new SharedUser(unsafe));
Safe safe = new Safe();
IDChecker.test(() -> new SharedUser(safe));
}
}
输出结果:
24838
0
在这里,SharedUser 构造器实际上共享了相同的参数。即使 SharedUser 以完全无害且合理的方式使用其自己的参数,其构造器的调用方式也会引起冲突。SharedUser 甚至不知道它是以这种方式调用的,更不必说控制它了。
同步构造器并不被 java 语言所支持,但是通过使用同步语块来创建你自己的同步构造器是可能的(请参阅附录:并发底层原理 ,来进一步了解同步关键字—— synchronized
)。尽管 JLS(java 语言规范)这样陈述道:“……它会锁定正在构造的对象”,但这并不是真的——构造器实际上只是一个静态方法,因此同步构造器实际上会锁定该类的 Class 对象。我们可以通过创建自己的静态对象并锁定它,来达到与同步构造器相同的效果:
// concurrent/SynchronizedConstructor.java
import java.util.concurrent.atomic.*;
class SyncConstructor implements HasID{
private final int id;
private static Object constructorLock =
new Object();
SyncConstructor(SharedArg sa){
synchronized (constructorLock){
id = sa.get();
}
}
@Override
public int getID(){
return id;
}
}
public class SynchronizedConstructor{
public static void main(String[] args){
Unsafe unsafe = new Unsafe();
IDChecker.test(() -> new SyncConstructor(unsafe));
}
}
输出结果:
0
Unsafe 类的共享使用现在就变得安全了。另一种方法是将构造器设为私有(因此可以防止继承),并提供一个静态 Factory 方法来生成新对象:
// concurrent/SynchronizedFactory.java
import java.util.concurrent.atomic.*;
final class SyncFactory implements HasID{
private final int id;
private SyncFactory(SharedArg sa){
id = sa.get();
}
@Override
public int getID(){
return id;
}
public static synchronized SyncFactory factory(SharedArg sa){
return new SyncFactory(sa);
}
}
public class SynchronizedFactory{
public static void main(String[] args){
Unsafe unsafe = new Unsafe();
IDChecker.test(() -> SyncFactory.factory(unsafe));
}
}
输出结果:
0
通过同步静态工厂方法,可以在构造过程中锁定 Class 对象。
这些示例充分表明了在并发 Java 程序中检测和管理共享状态有多困难。即使你采取“不共享任何内容”的策略,也很容易产生意外的共享事件。
复杂性和代价
假设你正在做披萨,我们把从整个流程的当前步骤到下一个步骤所需的工作量,在这里一一表示为枚举变量的一部分:
// concurrent/Pizza.java import java.util.function.*;
import onjava.Nap;
public class Pizza{
public enum Step{
DOUGH(4), ROLLED(1), SAUCED(1), CHEESED(2),
TOPPED(5), BAKED(2), SLICED(1), BOXED(0);
int effort;// Needed to get to the next step
Step(int effort){
this.effort = effort;
}
Step forward(){
if (equals(BOXED)) return BOXED;
new Nap(effort * 0.1);
return values()[ordinal() + 1];
}
}
private Step step = Step.DOUGH;
private final int id;
public Pizza(int id){
this.id = id;
}
public Pizza next(){
step = step.forward();
System.out.println("Pizza " + id + ": " + step);
return this;
}
public Pizza next(Step previousStep){
if (!step.equals(previousStep))
throw new IllegalStateException("Expected " +
previousStep + " but found " + step);
return next();
}
public Pizza roll(){
return next(Step.DOUGH);
}
public Pizza sauce(){
return next(Step.ROLLED);
}
public Pizza cheese(){
return next(Step.SAUCED);
}
public Pizza toppings(){
return next(Step.CHEESED);
}
public Pizza bake(){
return next(Step.TOPPED);
}
public Pizza slice(){
return next(Step.BAKED);
}
public Pizza box(){
return next(Step.SLICED);
}
public boolean complete(){
return step.equals(Step.BOXED);
}
@Override
public String toString(){
return "Pizza" + id + ": " + (step.equals(Step.BOXED) ? "complete" : step);
}
}
这只算得上是一个平凡的状态机,就像 Machina 类一样。
制作一个披萨,当披萨饼最终被放在盒子中时,就算完成最终任务了。 如果一个人在做一个披萨饼,那么所有步骤都是线性进行的,即一个接一个地进行:
// concurrent/OnePizza.java
import onjava.Timer;
public class OnePizza{
public static void main(String[] args){
Pizza za = new Pizza(0);
System.out.println(Timer.duration(() -> {
while (!za.complete()) za.next();
}));
}
}
输出结果:
Pizza 0: ROLLED
Pizza 0: SAUCED
Pizza 0: CHEESED
Pizza 0: TOPPED
Pizza 0: BAKED
Pizza 0: SLICED
Pizza 0: BOXED
1622
时间以毫秒为单位,加总所有步骤的工作量,会得出与我们的期望值相符的数字。 如果你以这种方式制作了五个披萨,那么你会认为它花费的时间是原来的五倍。 但是,如果这还不够快怎么办? 我们可以从尝试并行流方法开始:
// concurrent/PizzaStreams.java
// import java.util.*; import java.util.stream.*;
import onjava.Timer;
public class PizzaStreams{
static final int QUANTITY = 5;
public static void main(String[] args){
Timer timer = new Timer();
IntStream.range(0, QUANTITY)
.mapToObj(Pizza::new)
.parallel()//[1]
.forEach(za -> { while(!za.complete()) za.next(); }); System.out.println(timer.duration());
}
}
输出结果:
Pizza 2: ROLLED
Pizza 0: ROLLED
Pizza 1: ROLLED
Pizza 4: ROLLED
Pizza 3:ROLLED
Pizza 2:SAUCED
Pizza 1:SAUCED
Pizza 0:SAUCED
Pizza 4:SAUCED
Pizza 3:SAUCED
Pizza 2:CHEESED
Pizza 1:CHEESED
Pizza 0:CHEESED
Pizza 4:CHEESED
Pizza 3:CHEESED
Pizza 2:TOPPED
Pizza 1:TOPPED
Pizza 0:TOPPED
Pizza 4:TOPPED
Pizza 3:TOPPED
Pizza 2:BAKED
Pizza 1:BAKED
Pizza 0:BAKED
Pizza 4:BAKED
Pizza 3:BAKED
Pizza 2:SLICED
Pizza 1:SLICED
Pizza 0:SLICED
Pizza 4:SLICED
Pizza 3:SLICED
Pizza 2:BOXED
Pizza 1:BOXED
Pizza 0:BOXED
Pizza 4:BOXED
Pizza 3:BOXED
1739
现在,我们制作五个披萨的时间与制作单个披萨的时间就差不多了。 尝试删除标记为[1] 的行后,你会发现它花费的时间是原来的五倍。 你还可以尝试将 QUANTITY 更改为 4、8、10、16 和 17,看看会有什么不同,并猜猜看为什么会这样。
PizzaStreams 类产生的每个并行流在它的forEach()
内完成所有工作,如果我们将其各个步骤用映射的方式一步一步处理,情况会有所不同吗?
// concurrent/PizzaParallelSteps.java
import java.util.*;
import java.util.stream.*;
import onjava.Timer;
public class PizzaParallelSteps{
static final int QUANTITY = 5;
public static void main(String[] args){
Timer timer = new Timer();
IntStream.range(0, QUANTITY)
.mapToObj(Pizza::new)
.parallel()
.map(Pizza::roll)
.map(Pizza::sauce)
.map(Pizza::cheese)
.map(Pizza::toppings)
.map(Pizza::bake)
.map(Pizza::slice)
.map(Pizza::box)
.forEach(za -> System.out.println(za));
System.out.println(timer.duration());
}
}
输出结果:
Pizza 2: ROLLED
Pizza 0: ROLLED
Pizza 1: ROLLED
Pizza 4: ROLLED
Pizza 3: ROLLED
Pizza 1: SAUCED
Pizza 0: SAUCED
Pizza 2: SAUCED
Pizza 3: SAUCED
Pizza 4: SAUCED
Pizza 1: CHEESED
Pizza 0: CHEESED
Pizza 2: CHEESED
Pizza 3: CHEESED
Pizza 4: CHEESED
Pizza 0: TOPPED
Pizza 2: TOPPED
Pizza 1: TOPPED
Pizza 3: TOPPED
Pizza 4: TOPPED
Pizza 1: BAKED
Pizza 2: BAKED
Pizza 0: BAKED
Pizza 4: BAKED
Pizza 3: BAKED
Pizza 0: SLICED
Pizza 2: SLICED
Pizza 1: SLICED
Pizza 3: SLICED
Pizza 4: SLICED
Pizza 1: BOXED
Pizza1: complete
Pizza 2: BOXED
Pizza 0: BOXED
Pizza2: complete
Pizza0: complete
Pizza 3: BOXED
Pizza 4: BOXED
Pizza4: complete
Pizza3: complete
1738
答案是“否”,事后看来这并不奇怪,因为每个披萨都需要按顺序执行步骤。因此,没法通过分步执行操作来进一步提高速度,就像上文的 PizzaParallelSteps.java
里面展示的一样。
我们可以使用 CompletableFutures 重写这个例子:
// concurrent/CompletablePizza.java
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import onjava.Timer;
public class CompletablePizza{
static final int QUANTITY = 5;
public static CompletableFuture<Pizza> makeCF(Pizza za){
return CompletableFuture
.completedFuture(za)
.thenApplyAsync(Pizza::roll)
.thenApplyAsync(Pizza::sauce)
.thenApplyAsync(Pizza::cheese)
.thenApplyAsync(Pizza::toppings)
.thenApplyAsync(Pizza::bake)
.thenApplyAsync(Pizza::slice)
.thenApplyAsync(Pizza::box);
}
public static void show(CompletableFuture<Pizza> cf){
try{
System.out.println(cf.get());
} catch (Exception e){
throw new RuntimeException(e);
}
}
public static void main(String[] args){
Timer timer = new Timer();
List<CompletableFuture<Pizza>> pizzas =
IntStream.range(0, QUANTITY)
.mapToObj(Pizza::new)
.map(CompletablePizza::makeCF)
.collect(Collectors.toList());
System.out.println(timer.duration());
pizzas.forEach(CompletablePizza::show);
System.out.println(timer.duration());
}
}
输出结果:
169
Pizza 0: ROLLED
Pizza 1: ROLLED
Pizza 2: ROLLED
Pizza 4: ROLLED
Pizza 3: ROLLED
Pizza 1: SAUCED
Pizza 0: SAUCED
Pizza 2: SAUCED
Pizza 4: SAUCED
Pizza 3: SAUCED
Pizza 0: CHEESED
Pizza 4: CHEESED
Pizza 1: CHEESED
Pizza 2: CHEESED
Pizza 3: CHEESED
Pizza 0: TOPPED
Pizza 4: TOPPED
Pizza 1: TOPPED
Pizza 2: TOPPED
Pizza 3: TOPPED
Pizza 0: BAKED
Pizza 4: BAKED
Pizza 1: BAKED
Pizza 3: BAKED
Pizza 2: BAKED
Pizza 0: SLICED
Pizza 4: SLICED
Pizza 1: SLICED
Pizza 3: SLICED
Pizza 2: SLICED
Pizza 4: BOXED
Pizza 0: BOXED
Pizza0: complete
Pizza 1: BOXED
Pizza1: complete
Pizza 3: BOXED
Pizza 2: BOXED
Pizza2: complete
Pizza3: complete
Pizza4: complete
1797
并行流和 CompletableFutures 是 Java 并发工具箱中最先进发达的技术。 你应该始终首先选择其中之一。 当一个问题很容易并行处理时,或者说,很容易把数据分解成相同的、易于处理的各个部分时,使用并行流方法处理最为合适(而如果你决定不借助它而由自己完成,你就必须撸起袖子,深入研究 Spliterator 的文档)。
而当工作的各个部分内容各不相同时,使用 CompletableFutures 是最好的选择。比起面向数据,CompletableFutures 更像是面向任务的。
对于披萨问题,结果似乎也没有什么不同。实际上,并行流方法看起来更简洁,仅出于这个原因,我认为并行流作为解决问题的首次尝试方法更具吸引力。
由于制作披萨总需要一定的时间,无论你使用哪种并发方法,你能做到的最好情况,是在制作一个披萨的相同时间内制作 n 个披萨。 在这里当然很容易看出来,但是当你处理更复杂的问题时,你就可能忘记这一点。 通常,在项目开始时进行粗略的计算,就能很快弄清楚最大可能的并行吞吐量,这可以防止你因为采取无用的加快运行速度的举措而忙得团团转。
使用 CompletableFutures 或许可以轻易地带来重大收益,但是在尝试更进一步时需要倍加小心,因为额外增加的成本和工作量会非常容易远远超出你之前拼命挤出的那一点点收益。
本章小结
需要并发的唯一理由是“等待太多”。这也可以包括用户界面的响应速度,但是由于 Java 用于构建用户界面时并不高效,因此[4] 这仅仅意味着“你的程序运行速度还不够快”。
如果并发很容易,则没有理由拒绝并发。 正因为并发实际上很难,所以你应该仔细考虑是否值得为此付出努力,并考虑你能否以其他方式提升速度。
例如,迁移到更快的硬件(这可能比消耗程序员的时间要便宜得多)或者将程序分解成多个部分,然后在不同的机器上运行这些部分。
奥卡姆剃刀是一个经常被误解的原则。 我看过至少一部电影,他们将其定义为”最简单的解决方案是正确的解决方案“,就好像这是某种毋庸置疑的法律。实际上,这是一个准则:面对多种方法时,请先尝试需要最少假设的方法。 在编程世界中,这已演变为“尝试可能可行的最简单的方法”。当你了解了特定工具的知识时——就像你现在了解了有关并发性的知识一样,你可能会很想使用它,或者提前规定你的解决方案必须能够“速度飞快”,从而来证明从一开始就进行并发设计是合理的。但是,我们的奥卡姆剃刀编程版本表示你应该首先尝试最简单的方法(这种方法开发起来也更便宜),然后看看它是否足够好。
由于我出身于底层学术背景(物理学和计算机工程),所以我很容易想到所有小轮子转动的成本。我确定使用最简单的方法不够快的场景出现的次数已经数不过来了,但是尝试后却发现它实际上绰绰有余。
缺点
并发编程的主要缺点是:
在线程等待共享资源时会降低速度。
线程管理产生额外 CPU 开销。
糟糕的设计决策带来无法弥补的复杂性。
诸如饥饿,竞速,死锁和活锁(多线程各自处理单个任务而整体却无法完成)之类的问题。
跨平台的不一致。 通过一些示例,我发现了某些计算机上很快出现的竞争状况,而在其他计算机上却没有。 如果你在后者上开发程序,则在分发程序时可能会感到非常惊讶。
另外,并发的应用是一门艺术。 Java 旨在允许你创建尽可能多的所需要的对象来解决问题——至少在理论上是这样。[5] 但是,线程不是典型的对象:每个线程都有其自己的执行环境,包括堆栈和其他必要的元素,使其比普通对象大得多。 在大多数环境中,只能在内存用光之前创建数千个 Thread 对象。通常,你只需要几个线程即可解决问题,因此一般来说创建线程没有什么限制,但是对于某些设计而言,它会成为一种约束,可能迫使你使用完全不同的方案。
共享内存陷阱
并发性的主要困难之一是因为可能有多个任务共享一个资源(例如对象中的内存),并且你必须确保多个任务不会同时读取和更改该资源。
我花了多年的时间研究并发。 我了解到你永远无法相信使用共享内存并发的程序可以正常工作。 你可以轻易发现它是错误的,但永远无法证明它是正确的。 这是众所周知的并发原则之一。[6]
我遇到了许多人,他们对编写正确的线程程序的能力充满信心。 我偶尔开始认为我也可以做好。 对于一个特定的程序,我最初是在只有单个 CPU 的机器上编写的。 那时我能够说服自己该程序是正确的,因为我以为我对 Java 工具很了解。 而且在我的单 CPU 计算机上也没有失败。而到了具有多个 CPU 的计算机,程序出现问题不能运行后,我感到很惊讶,但这还只是众多问题中的一个而已。 这不是 Java 的错; “写一次,到处运行”,在单核与多核计算机间无法扩展到并发编程领域。这是并发编程的基本问题。 实际上你可以在单 CPU 机器上发现一些并发问题,但是在多线程实际上真的在并行运行的多 CPU 机器上,就会出现一些其他问题。
再举一个例子,哲学家就餐的问题可以很容易地进行调整,因此几乎不会产生死锁,这会给你一种一切都棒极了的印象。当涉及到共享内存并发编程时,你永远不应该对自己的编程能力变得过于自信。
This Albatross is Big
如果你对 Java 并发感到不知所措,那说明你身处在一家出色的公司里。你可以访问 Thread 类的Javadoc 页面, 看一下哪些方法现在是 Deprecated (废弃的)。这些是 Java 语言设计者犯过错的地方,因为他们在设计语言时对并发性了解不足。
事实证明,在 Java 的后续版本中添加的许多库解决方案都是无效的,甚至是无用的。 幸运的是,Java 8 中的并行 Streams 和 CompletableFutures 都非常有价值。但是当你使用旧代码时,仍然会遇到旧的解决方案。
在本书的其他地方,我谈到了 Java 的一个基本问题:每个失败的实验都永远嵌入在语言或库中。 Java 并发强调了这个问题。尽管有不少错误,但错误并不是那么多,因为有很多不同的尝试方法来解决问题。 好的方面是,这些尝试产生了更好,更简单的设计。 不利之处在于,在找到好的方法之前,你很容易迷失于旧的设计中。
其他类库
本章重点介绍了相对安全易用的并行工具流和 CompletableFutures ,并且仅涉及 Java 标准库中一些更细粒度的工具。 为避免你不知所措,我没有介绍你可能实际在实践中使用的某些库。我们使用了几个 Atomic (原子)类,ConcurrentLinkedDeque ,ExecutorService 和 ArrayBlockingQueue 。附录:并发底层原理 涵盖了其他一些内容,但是你还想探索 java.util.concurrent 的 Javadocs。 但是要小心,因为某些库组件已被新的更好的组件所取代。
考虑为并发设计的语言
通常,请谨慎地使用并发。 如果需要使用它,请尝试使用最现代的方法:并行流或 CompletableFutures 。 这些功能旨在(假设你不尝试共享内存)使你摆脱麻烦(在 Java 的世界范围内)。
如果你的并发问题变得比高级 Java 构造所支持的问题更大且更复杂,请考虑使用专为并发设计的语言,仅在需要并发的程序部分中使用这种语言是有可能的。 在撰写本文时,JVM 上最纯粹的功能语言是 Clojure(Lisp 的一种版本)和 Frege(Haskell 的一种实现)。这些使你可以在其中编写应用程序的并发部分语言,并通过 JVM 轻松地与你的主要 Java 代码进行交互。 或者,你可以选择更复杂的方法,即通过外部功能接口(FFI)将 JVM 之外的语言与另一种为并发设计的语言进行通信。[7]
你很容易被一种语言绑定,迫使自己尝试使用该语言来做所有事情。 一个常见的示例是构建 HTML / JavaScript 用户界面。 这些工具确实很难使用,令人讨厌,并且有许多库允许你通过使用自己喜欢的语言编写代码来生成这些工具(例如,Scala.js 允许你在 Scala 中完成代码)。
心理上的便利是一个合理的考虑因素。 但是,我希望我在本章(以及附录:并发底层原理 )中已经表明 Java 并发是一个你可能无法逃离很深的洞。 与 Java 语言的任何其他部分相比,在视觉上检查代码同时记住所有陷阱所需要的的知识要困难得多。
无论使用特定的语言、库使得并发看起来多么简单,都要将其视为一种妖术,因为总是有东西会在你最不期望出现的时候咬你。
拓展阅读
《Java Concurrency in Practice》,出自 Brian Goetz,Tim Peierls, Joshua Bloch,Joseph Bowbeer,David Holmes 和 Doug Lea (Addison Wesley,2006 年)——这些基本上就是 Java 并发世界中的名人名单了《Java Concurrency in Practice》第二版,出自 Doug Lea (Addison-Wesley,2000 年)。尽管这本书出版时间远远早于 Java 5 发布,但 Doug 的大部分工作都写入了 java.util.concurrent 库。因此,这本书对于全面理解并发问题至关重要。 它超越了 Java,讨论了跨语言和技术的并发编程。 尽管它在某些地方可能很钝,但值得多次重读(最好是在两个月之间进行消化)。 道格(Doug)是世界上为数不多的真正了解并发编程的人之一,因此这是值得的。
-
不,永远不会有纯粹的功能性 Java。我们所能期望的最好的是一种在 JVM 上运行的全新语言。 ↩
-
当两个任务能够更改其状态以使它们不会被阻止但它们从未取得任何有用的进展时,你也可以使用活动锁。 ↩
-
而不是超线程;通常每个内核有两个超线程,并且在询问内核数量时,本书所使用的 Java 版本会报告超线程的数量。超线程产生了更快的上下文切换,但是只有实际的内核才真的工作,而不是超线程。 ↩ ↩
-
库就在那里用于调用,而语言本身就被设计用于此目的,但实际上它很少发生,以至于可以说”没有“。↩ ↩
-
举例来说,如果没有 Flyweight 设计模式,在工程中创建数百万个对象用于有限元分析可能在 Java 中不可行。↩ ↩
-
在科学中,虽然从来没有一种理论被证实过,但是一种理论必须是可证伪的才有意义。而对于并发性,我们大部分时间甚至都无法得到这种可证伪性。↩ ↩
-
尽管 Go 语言显示了 FFI 的前景,但在撰写本文时,它并未提供跨所有平台的解决方案。 ↩