一、CompletableFuture的源起
在上一章我们提到了异步模式Future模式,但在实际生产过程中这个模式的会出现一些问题,我们来看以下代码:
ExecutorService service = Executors.newCachedThreadPool();
//自定义任务1
Future<String> fu1 = service.submit(()->{
Thread.sleep(3000);
return "mission 1 done";
});
//自定义任务二完成需要任务一先完成
while(fu1.isDone()) {
}
Future<String> fu2 = service.submit(()->{
Thread.sleep(3000);
return "mission 2 done";
});
开始任务二的前提是任务一完成,所以只能在主线程加入while循环判断任务是否结束这种方式会浪费资源;如果使用get方法会阻塞当前线程,违背异步编程的理论,另外还有两个局限性:1、Future无法组合多个结果行成链式调用;2、没有异常处理。为了解决Future的局限性,在JDK1.8时引入了CompletableFuture。
二、CompletableFuture分析
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
Future在一片文章中讲过,这里介绍一个新面孔CompletionStage接口,这个接口主要提供多个方法便于我们将多个CompletionStage命令组合起来,从而满足实现多个命令的触发并且可以抛出异常。故将CompletionStage与Future结合与一身的Future就到了大展身手的时候了。
三、CompletableFuture常用方法
- CompletableFuture创建,一般有以下四种静态方法创建:
返回值 | 方法名 | 介绍 |
---|---|---|
CompletableFuture<Void> | runAsync(Runnable) | 创建异步任务,使用默认线程池ForkJoinPool(这个详解在我的其他文章) |
CompletableFuture<Void> | runAsync(Runnable,Executor) | 创建异步任务,使用指定线程池 |
CompletableFuture<U> | supplyAsync(Supplier<U>) | 与第一个相同,异同点此方法有返回值 |
CompletableFuture<U> | supplyAsync(Supplier<U>,Executor) | 与第二个相同,异同点此方法有返回值 |
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
System.out.println("hello");
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
return "world";
});
future.get();
System.out.println(future1.get());
上述例子采用默认线程池,runAsync和supplyAsync的区别就是第二个有返回值。
- 数据转换类方法
2.1 thenApply方法
返回值 | 方法名 | 介绍 |
---|---|---|
CompletableFuture<U> | thenApply(Function<? super T,? extends U> fn) | 数据进行转换,使用ForkJoinPool线程池 |
CompletableFuture<U> | thenApplyAsync(Function<? super T,? extends U> fn) | 数据进行转换,使用ForkJoinPool线程池 |
CompletableFuture<U> | thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | 数据进行转换,自定义线程池 |
CompletableFuture<String> future = CompletableFuture
.supplyAsync(()->"hello")
.thenApplyAsync(w->w + " world\n")
.thenApplyAsync(w->w+" 真有意思\n")
.thenApplyAsync(String::toUpperCase);
System.out.println(future.get());
2.2 thenApply方法
返回值 | 方法名 | 介绍 |
---|---|---|
CompletableFuture<U> | thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) | 转换CompletableFuture,使用ForkJoinPool线程池 |
CompletableFuture<U> | thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) | 转换CompletableFuture,使用ForkJoinPool线程池 |
CompletableFuture<U> | thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) | 转换CompletableFuture,自定义线程池 |
CompletableFuture<String> future = CompletableFuture
.supplyAsync(()->Thread.currentThread().getName()+"hello")
.thenComposeAsync((w)->CompletableFuture.supplyAsync(()->w+"world"))
.thenApply(String::toUpperCase);
System.out.println(future.get());
上述可以看出thenApply将值进行串行处理,而thenComposeAsync将CompletableFuture进行处理。
- 组合
3.1 thenAcceptBoth方法
返回值 | 方法名 | 介绍 |
---|---|---|
CompletableFuture<Void> | thenAcceptBoth(Function<? super T, ? extends CompletionStage<U>> fn) | 结合传入的CompletableFuture,使用ForkJoinPool线程池 |
CompletableFuture<Void> | thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) | 结合传入的CompletableFuture,使用ForkJoinPool线程池 |
CompletableFuture<Void> | thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) | 结合传入的CompletableFuture,使用自定义线程池 |
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "hello";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
return "world";
});
future1.thenAcceptBoth(future,(a,b)->{System.out.println(b+a+"真有意思");});
结合传入的future,实现一个BiConsumer函数接口,从而实现将两个future结合起来。
3.2 thenCombine方法
返回值 | 方法名 | 介绍 |
---|---|---|
CompletableFuture<U> | thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 结合传入的CompletableFuture,使用ForkJoinPool线程池 |
CompletableFuture<U> | thenCombineAsync( CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn | 结合传入的CompletableFuture,使用ForkJoinPool线程池 |
CompletableFuture<U> | thenCombineAsync( CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor | 结合传入的CompletableFuture,使用自定义线程池 |
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "hello";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
return "world";
});
CompletableFuture<String> test= future1
.thenCombineAsync(future,(a,b)->{System.out.println(b+a+"真有意思");return "任务完成";});
System.out.println(test.get());
thenCombineAsync方法与thenAcceptBoth方法的区别为:
thenCombineAsync可以有一个CompletableFuture返回值;
thenCombineAsync第二个参数为BiFunction接口,thenAcceptBoth第二个参数为BiCosumer接口