1 Future介绍
1.1 Future的主要功能
JDK5新增了Future接口,用于描述一个异步计算的结果。
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果等操作。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
Future类位于java.util.concurrent包下,它是一个接口:
public interface Future<V> {
/**
* 方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。 *
* @param mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。
* @return 如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;
* 如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;
* 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 方法表示任务是否被取消成功
* @return 如果在任务正常完成前被取消成功,则返回 true
*/
boolean isCancelled();
/**
* 方法表示任务是否已经完成
* @return 若任务完成,则返回true
*/
boolean isDone();
/**
* 方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
* @return 任务执行的结果值
* @throws InterruptedException 线程被中断异常
* @throws ExecutionException 任务执行异常,如果任务被取消,还会抛出CancellationException
*/
V get() throws InterruptedException, ExecutionException;
/**
* 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null(并不是抛出异常,需要注意)。
* @param timeout 超时时间
* @param unit 超时单位
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException 如果计算超时,将抛出TimeoutException(待确认)
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
从上面方法的注释可以看出,Futrue提供了三种功能:
1)判断任务是否完成;
2)能够中断任务;
3)能够获取任务执行结果。(最为常用的)
1.2 Future的局限性
从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
- 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;当 for 循环批量获取 Future 的结果时容易 block,因此get 方法调用时应使用 timeout 限制。
- 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
- 没有异常处理:Future接口中没有关于异常处理的方法;
2 CompletableFuture介绍
虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。
阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。而CompletableFuture的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。
2.1 CompletableFuture原理
内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
2.2 应用场景
当需要批量提交异步任务的时候建议使用CompletableFuture。CompletableFuture将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
CompletableFuture能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待。
线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
2.3 CompletableFuture使用详解
简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。
CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。
更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
3 CompletableFuture应用梳理
很多方法上,可以指定线程池,而没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。
如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议要根据不同的业务类型创建不同的线程池,以避免互相干扰。
-
等我们使用的时候,会注意到
CompletableFuture
的方法命名规则:-
xxx()
:表示该方法将继续在已有的线程中执行;
-
xxxAsync()
:表示可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
4 使用案例
4.1 基础使用案例
串行执行:
定义两个CompletableFuture
,第一个CompletableFuture
根据证券名称查询证券代码,第二个CompletableFuture
根据证券代码查询证券价格,这两个CompletableFuture
实现串行操作如下:
CompletableFuture.supplyAsync():创建一个包含返回值的异步任务;
thenApplyAsync():获取前一个线程的结果进行转换,有返回值;
thenAccept():获取前一个线程的结果进行消费,无返回值。
public class Demo {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// 第一个任务:创建一个包含返回值的CompletableFuture
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油");
});
// cfQuery成功后继续执行下一个任务:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功后打印结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要结束,否则CompletableFuture默认使用的线程池会立刻关闭:
countDownLatch.await();
}
public static void main2(String[] args) throws Exception {
CompletableFuture.supplyAsync(() -> queryCode("中国石油"))
.thenApplyAsync((code) -> fetchPrice(code))
.thenAccept((result) -> System.out.println("price: " + result));
countDownLatch.await();
}
static String queryCode(String name) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
String code = "601857";
System.out.println("查询证券编码,name:" + name + ",code:" + code);
return code;
}
static Double fetchPrice(String code) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
Double price = 5 + Math.random() * 20;
System.out.println("根据证券编码查询价格,code:" + code + ";price:" + price);
return price;
}
}
并行执行:
除了串行执行外,多个CompletableFuture
还可以并行执行。例如,我们考虑这样的场景:
同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
CompletableFuture.supplyAsync():创建一个包含返回值的异步任务;
CompletableFuture.anyOf(cf1,cf2,cf3).join():多个异步线程任一执行完即返回,有返回值Object;
thenAccept():获取前一个线程的结果进行消费,无返回值。
public class Demo2 {
private static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// 两个CompletableFuture执行异步查询:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://money.163.com/code/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 两个CompletableFuture执行异步查询:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最终结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
COUNT_DOWN_LATCH.await();
}
public static void main2(String[] args) throws Exception {
// 两个CompletableFuture执行异步查询:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> queryCode("中国石油", "https://finance.sina.com.cn/code/"));
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> queryCode("中国石油", "https://money.163.com/code/"));
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 两个CompletableFuture执行异步查询:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://finance.sina.com.cn/price/"));
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://money.163.com/price/"));
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最终结果:
cfFetch.thenAccept((result) -> System.out.println("price: " + result));
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
COUNT_DOWN_LATCH.await();
}
static String queryCode(String name, String url) {
System.out.println(Thread.currentThread().getName() + " query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
String code = "601857";
System.out.println(Thread.currentThread().getName() + " 查询证券编码,name:" + name + ",code:" + code);
return code;
}
static Double fetchPrice(String code, String url) {
System.out.println(Thread.currentThread().getName() + " query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
Double price = 5 + Math.random() * 20;
System.out.println(Thread.currentThread().getName() + " 根据证券编码查询价格,code:" + code + ";price:" + price);
return price;
}
}
上述逻辑实现的异步查询规则实际上是:
4.2 实现最优的“烧水泡茶”程序
public class Demo3 {
public static void main(String[] args) {
//任务1:洗水壶 -> 烧开水
CompletableFuture<String> f11 = CompletableFuture.supplyAsync(() -> {
System.out.println("T1:洗水壶...开始");
sleep(1000);
return "T1:洗水壶...完成";
});
CompletableFuture<String> f12 = f11.thenApply((f11Result) -> {
System.out.println(f11Result);
System.out.println("T1:烧开水...开始");
sleep(3000);
return "T1:烧开水...完成";
});
//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<Void> f21 = CompletableFuture.runAsync(() -> {
System.out.println("==============T2:洗茶壶...开始");
sleep(1000);
System.out.println("==============T2:洗茶壶...完成");
});
CompletableFuture<Void> f22 = f21.thenRun(() -> {
System.out.println("==============T2:洗茶杯...开始");
sleep(2000);
System.out.println("==============T2:洗茶杯...完成");
});
CompletableFuture<String> f23 = f22.thenApply(result -> {
System.out.println("==============T2:拿茶叶...开始");
sleep(1000);
System.out.println("==============T2:拿茶叶...完成");
return "龙井";
});
//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 = f12.thenCombine(f23, (f1Result, f2Result) -> {
System.out.println(f1Result);
System.out.println("************T2:拿到茶叶:result" + f2Result);
System.out.println("************T3:泡茶...,什么茶:" + f2Result);
return "上茶:" + f2Result;
});
//等待任务3执行结果
System.out.println(f3.join());
}
static void sleep(int t) {
try {
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}