这是学习 Java 8 中的 CompletableFuture 的写的例子,总结并记录一下。
考虑这样一种场景,有 4 个方法连接 DB 查询数据,串行执行所耗时长较长,现在使用 CompletableFuture 并发调用。
需要使用的方法有两个:supplyAsync(),thenApply()
supplyAsync() : 接收一个
supplier<T>
并且返回CompletableFuture<T>
,T
是通过调用 传入的supplier取得的值的类型。
thenApply() : 获得一个 CompletableFuture<T> 的回调,当 Future 完成时接收结果。
这段代码模拟一个耗时操作
@Async
private CompletableFuture<Double> getHeight(){
// 打印日志
logger.warn(Thread.currentThread().getName() + "start height task!");
Double height = 0.0;
try {
TimeUnit.SECONDS.sleep(1);
height = 183.0;
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.warn(Thread.currentThread().getName() + "end height task!");
return CompletableFuture.completedFuture(height);
}
假设有3个和上方代码类似的耗时操作,每个耗时 1s ,那顺序执行应该耗时 4s,所以尝试使用下方代码让其并发执行
@Async
public CompletableFuture<Double> getBMI() throws ExecutionException, InterruptedException {
//开始时间
long start = System.currentTimeMillis();
logger.warn(Thread.currentThread().getName() + "run this task!");
CompletableFuture heightFuture = CompletableFuture.supplyAsync(() -> {
try {
return getHeight().get();
} catch (InterruptedException e) {
e.printStackTrace();
return 0.0;
} catch (ExecutionException e) {
e.printStackTrace();
return 0.0;
}
}, executor).thenApply(c -> {return c;});
CompletableFuture internalFuture = CompletableFuture.supplyAsync(() -> {
try {
return getIsInternal().get();
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} catch (ExecutionException e) {
e.printStackTrace();
return false;
}
},executor).thenApply(c -> {return c;});
CompletableFuture weightFuture = CompletableFuture.supplyAsync(() -> {
try {
return getWeight().get();
} catch (InterruptedException e) {
e.printStackTrace();
return 0.0;
} catch (ExecutionException e) {
e.printStackTrace();
return 0.0;
}
},executor).thenApply(c -> {return c;});
CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
try {
return getUser().get();
} catch (InterruptedException e) {
e.printStackTrace();
return null;
} catch (ExecutionException e) {
e.printStackTrace();
return null;
}
},executor).thenApply(c -> {return c;});
Double height = (Double) heightFuture.get();
Double weight = (Double) weightFuture.get();
User user = (User) userFuture.get();
logger.warn("height:" + height);
logger.warn("weight:" + weight);
logger.warn("user:" + user.getUserId());
logger.warn("isinternal:" + internalFuture.get().toString());
Double BMI = weight/((height/100)*(height/100));
// 打印结果以及运行程序运行花费时间
System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
return CompletableFuture.completedFuture(BMI);
}
通过 supplyAsync() 运行一个有返回值的异步任务,通过 thenApply() 在任务完成时获取结果。
在上方的代码中 supplyAsync() 方法的第二个参数是 Executor 类型,我使用的是 ThreadPoolTaskExecutor 这个类型的线程池,如果这里不传,将会默认使用 ForkJoinPool.commonPool() 这个线程池。
ThreadPoolTaskExecutor 有几个核心参数:
- CorePoolSize 核心线程数,表示可同时运行的线程的最小数量
- MaxPoolSize 最大线程数,当队列满时可同时运行的最大线程数量
- QueueCapacity 队列大小,当核心线程数已经达到最大值时,任务会被加入队列,当队列满时,会开启非核心线程,当总线程数到达最大值且队列已满时,会按照饱和策略丢弃或用任务本身的线程继续执行。
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
@Bean
public Executor taskExecutor() {
// Spring 默认配置是核心线程数大小为1,最大线程容量大小不受限制,队列容量也不受限制。
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(CORE_POOL_SIZE);
// 最大线程数
executor.setMaxPoolSize(MAX_POOL_SIZE);
// 队列大小
executor.setQueueCapacity(QUEUE_CAPACITY);
// 当最大池已满时,此策略保证不会丢失任务请求,但是可能会影响应用程序整体性能。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
executor.initialize();
return executor;
}
}