Future & CompleteFuture 实践总结

1 Future介绍

1.1 Future的主要功能

JDK5新增了Future接口,用于描述一个异步计算的结果。

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果等操作。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下,它是一个接口:

public interface Future<V> { 
 /** 
  * 方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。 * 
  * @param mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。 
  * @return 如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false; 
  * 如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false; 
  * 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。 
  */ 
  boolean cancel(boolean mayInterruptIfRunning); 

   /** 
    * 方法表示任务是否被取消成功 
    * @return 如果在任务正常完成前被取消成功,则返回 true 
    */ 
   boolean isCancelled(); 

   /** 
    * 方法表示任务是否已经完成 
    * @return 若任务完成,则返回true 
    */ 
   boolean isDone(); 

   /** 
    * 方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回 
    * @return 任务执行的结果值 
    * @throws InterruptedException 线程被中断异常
    * @throws ExecutionException 任务执行异常,如果任务被取消,还会抛出CancellationException
    */ 
   V get() throws InterruptedException, ExecutionException; 

   /** 
    * 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null(并不是抛出异常,需要注意)。 
    * @param timeout 超时时间 
    * @param unit 超时单位 
    * @return 
    * @throws InterruptedException 
    * @throws ExecutionException 
    * @throws TimeoutException 如果计算超时,将抛出TimeoutException(待确认)
    */ 
   V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 
}

从上面方法的注释可以看出,Futrue提供了三种功能:

1)判断任务是否完成;

2)能够中断任务;

3)能够获取任务执行结果。(最为常用的)

1.2 Future的局限性

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;当 for 循环批量获取 Future 的结果时容易 block,因此get 方法调用时应使用 timeout 限制。
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理:Future接口中没有关于异常处理的方法;

2 CompletableFuture介绍

虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。

阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。而CompletableFuture的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。

2.1 CompletableFuture原理

内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。

2.2 应用场景

当需要批量提交异步任务的时候建议使用CompletableFuture。CompletableFuture将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。

CompletableFuture能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待。

线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

2.3 CompletableFuture使用详解

简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。

CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。

更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

3 CompletableFuture应用梳理

按照使用功能分类

按应用场景分类
  • 很多方法上,可以指定线程池,而没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

  • 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。

  • 如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议要根据不同的业务类型创建不同的线程池,以避免互相干扰。

  • 等我们使用的时候,会注意到CompletableFuture的方法命名规则:

    • xxx():表示该方法将继续在已有的线程中执行;
  • xxxAsync():表示可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

4 使用案例

4.1 基础使用案例

串行执行:

定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:

CompletableFuture.supplyAsync():创建一个包含返回值的异步任务;
thenApplyAsync():获取前一个线程的结果进行转换,有返回值;
thenAccept():获取前一个线程的结果进行消费,无返回值。


public class Demo {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        // 第一个任务:创建一个包含返回值的CompletableFuture
        CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油");
        });
        // cfQuery成功后继续执行下一个任务:
        CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice(code);
        });
        // cfFetch成功后打印结果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主线程不要结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        countDownLatch.await();
    }

    public static void main2(String[] args) throws Exception {
        CompletableFuture.supplyAsync(() -> queryCode("中国石油"))
                .thenApplyAsync((code) -> fetchPrice(code))
                .thenAccept((result) -> System.out.println("price: " + result));
        countDownLatch.await();
    }

    static String queryCode(String name) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        String code = "601857";
        System.out.println("查询证券编码,name:" + name + ",code:" + code);
        return code;
    }

    static Double fetchPrice(String code) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        Double price = 5 + Math.random() * 20;
        System.out.println("根据证券编码查询价格,code:" + code + ";price:" + price);
        return price;
    }
}

并行执行:

除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:

同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

CompletableFuture.supplyAsync():创建一个包含返回值的异步任务;
CompletableFuture.anyOf(cf1,cf2,cf3).join():多个异步线程任一执行完即返回,有返回值Object;
thenAccept():获取前一个线程的结果进行消费,无返回值。

public class Demo2 {
    private static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油", "https://finance.sina.com.cn/code/");
        });
        CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油", "https://money.163.com/code/");
        });

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);

        // 两个CompletableFuture执行异步查询:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
        });
        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://money.163.com/price/");
        });

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

        // 最终结果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        COUNT_DOWN_LATCH.await();
    }

    public static void main2(String[] args) throws Exception {
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> queryCode("中国石油", "https://finance.sina.com.cn/code/"));
        CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> queryCode("中国石油", "https://money.163.com/code/"));

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);

        // 两个CompletableFuture执行异步查询:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://finance.sina.com.cn/price/"));
        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://money.163.com/price/"));

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

        // 最终结果:
        cfFetch.thenAccept((result) -> System.out.println("price: " + result));
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        COUNT_DOWN_LATCH.await();
    }

    static String queryCode(String name, String url) {
        System.out.println(Thread.currentThread().getName() + " query code from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String code = "601857";
        System.out.println(Thread.currentThread().getName() + " 查询证券编码,name:" + name + ",code:" + code);
        return code;
    }

    static Double fetchPrice(String code, String url) {
        System.out.println(Thread.currentThread().getName() + " query price from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Double price = 5 + Math.random() * 20;
        System.out.println(Thread.currentThread().getName() + " 根据证券编码查询价格,code:" + code + ";price:" + price);
        return price;
    }
}

上述逻辑实现的异步查询规则实际上是:

image

4.2 实现最优的“烧水泡茶”程序


public class Demo3 {

    public static void main(String[] args) {

        //任务1:洗水壶 -> 烧开水
        CompletableFuture<String> f11 = CompletableFuture.supplyAsync(() -> {
            System.out.println("T1:洗水壶...开始");
            sleep(1000);
            return "T1:洗水壶...完成";
        });
        CompletableFuture<String> f12 = f11.thenApply((f11Result) -> {
            System.out.println(f11Result);
            System.out.println("T1:烧开水...开始");
            sleep(3000);
            return "T1:烧开水...完成";
        });

        //任务2:洗茶壶->洗茶杯->拿茶叶
        CompletableFuture<Void> f21 = CompletableFuture.runAsync(() -> {
            System.out.println("==============T2:洗茶壶...开始");
            sleep(1000);
            System.out.println("==============T2:洗茶壶...完成");
        });
        CompletableFuture<Void> f22 = f21.thenRun(() -> {
            System.out.println("==============T2:洗茶杯...开始");
            sleep(2000);
            System.out.println("==============T2:洗茶杯...完成");
        });
        CompletableFuture<String> f23 = f22.thenApply(result -> {
            System.out.println("==============T2:拿茶叶...开始");
            sleep(1000);
            System.out.println("==============T2:拿茶叶...完成");
            return "龙井";
        });
        //任务3:任务1和任务2完成后执行:泡茶
        CompletableFuture<String> f3 = f12.thenCombine(f23, (f1Result, f2Result) -> {
            System.out.println(f1Result);
            System.out.println("************T2:拿到茶叶:result" + f2Result);
            System.out.println("************T3:泡茶...,什么茶:" + f2Result);
            return "上茶:" + f2Result;
        });
        //等待任务3执行结果
        System.out.println(f3.join());
    }

    static void sleep(int t) {
        try {
            Thread.sleep(t);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

参考文档:

CompletableFuture (Java Platform SE 8 )

Future & CompleteFuture 剖析实战

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,647评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,690评论 2 374
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,739评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,692评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,552评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,410评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,819评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,463评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,752评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,789评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,572评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,414评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,833评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,054评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,345评论 1 254
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,810评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,016评论 2 337

推荐阅读更多精彩内容