什么是Fork/Join框架
Fork/Join框架是一组允许程序员利用多核处理器支持的并行执行的API。它使用了“分而治之”策略:把非常大的问题分成更小的部分,反过来,小部分又可以进一步分成更小的部分,递归地直到一个部分可以直接解决。这被叫做“fork”。
然后所有部件在多个处理核心上并行执行。每个部分的结果被“join”在一起以产生最终结果。因此,框架的名称是“Fork/Join”。
下面的为代码展示了分治策略如何与Fork/Join框架一起工作:
if (problemSize < threshold)
solve problem directly
else {
break problem into subproblems
recursively solve each problem
combine the results
}
Fork/Join框架在JDk7中被加入,并在JDK8中进行了改进。它用了Java语言中的几个新特性,包括并行的Stream API和排序。
Fork/Join框架简化了并行程序的原因有:
- 它简化了线程的创建,在框架中线程是自动被创建和管理。
- 它自动使用多个处理器,因此程序可以扩展到使用可用处理器。
由于支持真正的并行执行,Fork/Join框架可以显著减少计算时间,并提高解决图像处理、视频处理、大数据处理等非常大问题的性能。
关于Fork/Join框架的一个有趣的地方是:它使用工作窃取算法来平衡线程之间的负载:如果一个工作线程没有事情要做,它可以从其他仍然忙碌的线程窃取任务。
理解Fork/Join框架API
Fork/Join框架在java.util.concurrent
包下被实现。它的核心有4个类:
-
ForkJoinTask<V>: 这是一个抽象任务类,并且运行在
ForkJoinPool
中。 -
ForkJoinPool:这是一个线程池管理并运行众多
ForkJoinTask
任务。 -
RecursiveAction:
ForkJoinTask
的子类,这个类没有返回值。 -
RecursiveTask<V>:
ForkJoinTask
的子类,有返回值。
基本上,我们解决问题的代码是在RecursiveAction
或者RecursiveTask
中进行的,然后将任务提交由ForkJoinPool`执行,ForkJoinPool处理从线程管理到多核处理器的利用等各种事务。
我们先来理解一下这些类中的关键方法。
ForkJoinTask<V>
这是一个运行在ForkJoinPool
中的抽象的任务类。类型V
指定了任务的返回结果。ForkJoinTask是一个类似线程的实体,它表示任务的轻量级抽象,而不是实际的执行线程。该机制允许由ForkJoinPool中的少量实际线程管理大量任务。其关键方法是:
- final ForkJoinTask<V> fork()
- final V join()
- final V invoke()
fork()
方法提交并执行异步任务,该方法返回ForkJoinTask
并且调用线程继续运行。
join()
方法等待任务直到返回结果。
invoke()
方法是组合了fork()
和join()
,它开始一个任务并等待结束返回结果。
此外,ForkJoinTask
中还提供了用于一次调用多个任务的两个静态方法
- static void invokeAll(ForkJoinTask<?> task1, ForkJoinTask<?> task2) :执行两个任务
- static void invokeAll(ForkJoinTask<?>… taskList):执行任务集合
RecursiveAction
这是一个递归的ForkJoinTask
子类,不返回结果。Recursive
意思是任务可以通过分治策略分成自己的子任务(在下面的下一节中,您将看到如何划分代码示例)。
我们必须重写compute()
方法,并将计算代码写在其中:
protected abstract void compute();
RecursiveTask<V>
和RecursiveAction
一样,但是RecursiveTask
有返回结果,结果类型由V
指定。我们仍然需要重写compute()
方法:
protected abstract V compute();
ForkJoinPool
这是Fork/Join框架的核心类。它负责线程的管理和ForkJoinTask
的执行,为了执行ForkJoinTask
,首先需要获取到ForkJoinPool
的实例。
有两种构造器方式可以获取ForkJoinPool
的实例,第一种使用构造器创建:
- ForkJoinPool(): 使用默认的构造器创建实例,该构造器创建出的池与系统中可用的处理器数量相等。
- ForkJoinPool(int parallelism):该构造器指定处理器数量,创建具有自定义并行度级别的池,该级别的并行度必须大于0,且不超过可用处理器的实际数量。
并行性的级别决定了可以并发执行的线程的数量。换句话说,它决定了可以同时执行的任务的数量——但不能超过处理器的数量。
但是,这并不限制池可以管理的任务的数量。ForkJoinPool可以管理比其并行级别多得多的任务。
获取ForkJoinPool实例的第二种方法是使用以下ForkJoinPool的静态方法获取公共池实例:
public static ForkJoinPool commonPool();
这种方式创建的池不受shutdown()
或者shutdownNow()
方法的影响,但是他会在System.exit()
时会自动中止。任何依赖异步任务处理的程序在主体程序中止前都应该调用awaitQuiescence()
方法。该方式是静态的,可以自动被使用。
在ForkJoinPool中执行ForkJoinTasks
在创建好ForkJoinPool实例之后,可以使用下面的方法执行任务:
- <T>T invoke(ForkJoinTask<T> task):执行指定任务并返回结果,该方法是异步的,调用的线程会一直等待直到该方法返回结果,对于RecursiveAction任务来说,参数类型是Void.
- void execute(ForkJoinTask<?> task):异步执行指定的任务,调用的线程一直等待知道任务完成才会继续执行。
另外,也可以通过ForkJoinTask自己拥有的方法fork()
和invoke()
执行任务。在这种情况下,如果任务还没在ForkJoinPool中运行,那么commonPool()
将会自动被使用。
值得注意的一点是:ForkJoinPool使用的是守护线程,当所有的用户线程被终止是它也会被终止,这意味着可以不必显示的关闭ForkPoolJoin(虽然这样也可以)。如果是common pool的情况下,调用shutdown
没有任何效果,应为这个池总是可用的。
好了,现在来看看一些例子。
案例
使用RecursiveAction
这里例子中,看一下如果使用Fork/Join框架去执行一个没有返回值的任务。
假设要对一个很大的数字数组进行变换,为了简单简单起见,转换只需要将数组中的每个元素乘以指定的数字。下面的代码用于转换任务:
import java.util.concurrent.*;
public class ArrayTransform extends RecursiveAction {
int[] array;
int number;
int threshold = 100_000;
int start;
int end;
public ArrayTransform(int[] array, int number, int start, int end) {
this.array = array;
this.number = number;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
invokeAll(subTask1, subTask2);
}
}
protected void computeDirectly() {
for (int i = start; i < end; i++) {
array[i] = array[i] * number;
}
}
}
可以看到,这是一个RecursiveAction的子类,我们重写了compute()
方法。
数组和数字从它的构造函数传递。参数start和end指定要处理的数组中的元素的范围。如果数组的大小大于阈值,这有助于将数组拆分为子数组,否则直接对整个数组执行计算。
观察else中的代码片段:
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
invokeAll(subTask1, subTask2);
}
}
这里,将数组分成两个部分,并分别创建他们的子任务,反过来,子任务也可以递归的进一步划分为更小的子任务,直到其大小小于直接调用computeDirectly();
方法的的阈值。
然后,在main函数中创建ForkJoinPool执行任务:
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);
或者使用common pool执行任务:
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
mainTask.invoke();
这里是全部的测试程序:
import java.util.*;
import java.util.concurrent.*;
public class ForkJoinRecursiveActionTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();
public static void main(String[] args) {
int number = 9;
System.out.println("数组中的初始元素: ");
print();
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);
System.out.println("并行计算之后的元素:");
print();
}
static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();
for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}
return array;
}
static void print() {
for (int i = 0; i < 10; i++) {
System.out.print(array[i] + ", ");
}
System.out.println();
}
}
如您所见,使用随机生成的1,000万个元素数组进行测试。由于数组太大,我们在计算前后只打印前10个元素,看效果如何:
数组中的初始元素:
42, 98, 43, 14, 9, 92, 33, 18, 18, 76,
并行计算之后的元素:
378, 882, 387, 126, 81, 828, 297, 162, 162, 684,
使用RecursiveTask
这个例子中,展示了如何使用带有返回值的任务,下面的任务计算在一个大数组中出现偶数的次数:
import java.util.concurrent.*;
public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold = 100_000;
int start;
int end;
public ArrayCounter(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayCounter subTask1 = new ArrayCounter(array, start, middle);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end);
invokeAll(subTask1, subTask2);
return subTask1.join() + subTask2.join();
}
}
protected Integer computeDirectly() {
Integer count = 0;
for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}
return count;
}
}
如你所见,这个类是RecursiveTask的子类并且重写了compute()
方法,并且返回了一个整型的结果。
这里还使用了join()
方法去合并子任务的结果:
return subTask1.join() + subTask2.join();
测试程序就和RecursiveAction的一样:
import java.util.*;
import java.util.concurrent.*;
public class ForkJoinRecursiveTaskTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();
public static void main(String[] args) {
ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
Integer evenNumberCount = pool.invoke(mainTask);
System.out.println("偶数的个数: " + evenNumberCount);
}
static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();
for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}
return array;
}
}
运行程序就会看到如下的结果:
偶数的个数: 5000045
并行性试验
这个例子展示并行性的级别如何影响计算时间:
ArrayCounter
类让阈值可以通过构造器传入:
import java.util.concurrent.*;
public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold;
int start;
int end;
public ArrayCounter(int[] array, int start, int end, int threshold) {
this.array = array;
this.start = start;
this.end = end;
this.threshold = threshold;
}
protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold);
invokeAll(subTask1, subTask2);
return subTask1.join() + subTask2.join();
}
}
protected Integer computeDirectly() {
Integer count = 0;
for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}
return count;
}
}
测试程序将并行度级别和阈值作为参数传递:
import java.util.*;
import java.util.concurrent.*;
public class ParallelismTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();
public static void main(String[] args) {
int threshold = Integer.parseInt(args[0]);
int parallelism = Integer.parseInt(args[1]);
long startTime = System.currentTimeMillis();
ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold);
ForkJoinPool pool = new ForkJoinPool(parallelism);
Integer evenNumberCount = pool.invoke(mainTask);
long endTime = System.currentTimeMillis();
System.out.println("偶数的个数: " + evenNumberCount);
long time = (endTime - startTime);
System.out.println("执行时间: " + time + " ms");
}
static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();
for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}
return array;
}
}
该程序允许您使用不同的并行度和阈值轻松测试性能。注意,它在最后打印执行时间。尝试用不同的参数多次运行这个程序,并观察执行时间。
结论
- Fork/Join框架的设计简化了java语言的并行程序
-
ForkJoinPool
是Fork/Join框架的核心,它允许多个ForkJoinTask
请求由少量实际线程执行,每个线程运行在单独的处理核心上 - 既可以通过构造器也可以通过静态方法common pool去获取ForkJoinPool的实例
- ForkJoinTask是一个抽象类,它表示的任务比普通线程更轻。通过覆盖其compute()方法实现计算逻辑
- RecursiveAction是一个没有返回值的ForkJoinTask
- RecursiveTask是一个有返回值的ForkJoinTask
- ForkJoinPool与其它池的不同之处在于,它使用了工作窃取算法,该算法允许一个线程完成了可以做的事情,从仍然繁忙的其他线程窃取任务
- ForkJoinPool中的线程是守护线程,不必显式地关闭池
- 执行一个ForkJoinTask既可以通过调用它自己的
invoke()
或fork()
方法,也可以提交任务给ForkJoinPool并调用它的invoke()
或者execute()
方法 - 直接使用ForkJoinTask自身的方法执行任务,如果它还没运行在
ForkJoinPool
中那么将运行在common pool中 - 在
ForkJoinTask
中使用join()
方法,可以合并子任务的结果 -
invoke()
方法会等待子任务完成,但是execute()
方法不会