Java 8 Completeable Futue 简单使用

这是学习 Java 8 中的 CompletableFuture 的写的例子,总结并记录一下。

考虑这样一种场景,有 4 个方法连接 DB 查询数据,串行执行所耗时长较长,现在使用 CompletableFuture 并发调用。

需要使用的方法有两个:supplyAsync()thenApply()

supplyAsync() : 接收一个supplier<T> 并且返回CompletableFuture<T>T 是通过调用 传入的supplier取得的值的类型。

thenApply() : 获得一个 CompletableFuture<T> 的回调,当 Future 完成时接收结果。

这段代码模拟一个耗时操作

    @Async
    private CompletableFuture<Double> getHeight(){
        // 打印日志
        logger.warn(Thread.currentThread().getName() + "start height task!");
        Double height = 0.0;
        try {
            TimeUnit.SECONDS.sleep(1);
            height = 183.0;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        logger.warn(Thread.currentThread().getName() + "end height task!");
        return CompletableFuture.completedFuture(height);
    }

假设有3个和上方代码类似的耗时操作,每个耗时 1s ,那顺序执行应该耗时 4s,所以尝试使用下方代码让其并发执行

@Async
    public CompletableFuture<Double> getBMI() throws ExecutionException, InterruptedException {
        //开始时间
        long start = System.currentTimeMillis();
        logger.warn(Thread.currentThread().getName() + "run this task!");
        CompletableFuture heightFuture = CompletableFuture.supplyAsync(() -> {
            try {
                return getHeight().get();
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0.0;
            } catch (ExecutionException e) {
                e.printStackTrace();
                return 0.0;
            }
        }, executor).thenApply(c -> {return c;});

        CompletableFuture internalFuture = CompletableFuture.supplyAsync(() -> {
            try {
                return getIsInternal().get();
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            } catch (ExecutionException e) {
                e.printStackTrace();
                return false;
            }
        },executor).thenApply(c -> {return c;});

        CompletableFuture weightFuture = CompletableFuture.supplyAsync(() -> {
            try {
                return getWeight().get();
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0.0;
            } catch (ExecutionException e) {
                e.printStackTrace();
                return 0.0;
            }
        },executor).thenApply(c -> {return c;});

        CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
            try {
                return getUser().get();
            } catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            } catch (ExecutionException e) {
                e.printStackTrace();
                return null;
            }
        },executor).thenApply(c -> {return c;});


        Double height = (Double) heightFuture.get();
        Double weight = (Double) weightFuture.get();
        User user = (User) userFuture.get();

        logger.warn("height:" + height);
        logger.warn("weight:" + weight);
        logger.warn("user:" + user.getUserId());
        logger.warn("isinternal:" + internalFuture.get().toString());

        Double BMI = weight/((height/100)*(height/100));

        // 打印结果以及运行程序运行花费时间
        System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
       return CompletableFuture.completedFuture(BMI);
    }

通过 supplyAsync() 运行一个有返回值的异步任务,通过 thenApply() 在任务完成时获取结果。

在上方的代码中 supplyAsync() 方法的第二个参数是 Executor 类型,我使用的是 ThreadPoolTaskExecutor 这个类型的线程池,如果这里不传,将会默认使用 ForkJoinPool.commonPool() 这个线程池。

ThreadPoolTaskExecutor 有几个核心参数:

  • CorePoolSize 核心线程数,表示可同时运行的线程的最小数量
  • MaxPoolSize 最大线程数,当队列满时可同时运行的最大线程数量
  • QueueCapacity 队列大小,当核心线程数已经达到最大值时,任务会被加入队列,当队列满时,会开启非核心线程,当总线程数到达最大值且队列已满时,会按照饱和策略丢弃或用任务本身的线程继续执行。
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;

    @Bean
    public Executor taskExecutor() {
        // Spring 默认配置是核心线程数大小为1,最大线程容量大小不受限制,队列容量也不受限制。
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(CORE_POOL_SIZE);
        // 最大线程数
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        // 队列大小
        executor.setQueueCapacity(QUEUE_CAPACITY);
        // 当最大池已满时,此策略保证不会丢失任务请求,但是可能会影响应用程序整体性能。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
        executor.initialize();
        return executor;
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容