Java线程-Callable、Future、FutureTask学习(七)

一. 前言

  在前面,我们已经了解了创建线程的两种方式:继承Thread或者实现Runnable接口,由于Thread也实现了Runnable接口,实际上上面两种方式是一种方式,不过这种方式有一个问题就是在任务完成之后无法获取执行的结果。而我们今天要学习的Callable就可以解决该问题。

二. 简介

  Callable和Future是JDK1.5之后引入的,也是用于创建线程,不过在任务执行完成之后可以得到任务的执行结果,我们来简单看下这两个接口及相关的实现。

1. Callable接口
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable接口是一个泛型接口,可以返回结果,并可以抛出异常,只有一个call方法,类似于Runnable的run方法,用于执行任务,但run方法没有返回值,并且不能抛出checked Exception。而call方法是有返回值,且返回的类型就是传进来的V类型。

而Callable 接口一般是配合ExecutorService线程池来使用的,在ExecutorService中声明了若干个submit方法:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

目前只需要知道Callable一般是和ExecutorService配合来使用的,而有关线程池方面的内容,我们后续自然会学习到。

2. Future接口

  Future表示的是异步计算的结果,提供了查看任务是否完成,取消任务,获取结果等接口。而针对获取结果接口,只有在任务执行完成之后,才能获取,否则的话,该方法会阻塞直到任务执行完成。取消接口的话,一旦任务执行完成,就不能取消。

If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task. (如果出于可取消性的考虑而不想提供可用的结果,那么可以声明 Future<?> 类型并返回 null作为任务的执行结果)

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口提供了如下5个接口:

  1. cancel,尝试取消任务,如果任务已经执行完成,已被取消或者由于其他原因无法取消,则取消失败,返回false;如果该任务尚未start,那么调用cancel后,该任务将取消成功,返回true;如果任务已经在执行中,那么通过参数mayInterruptIfRunning来判断执行该任务的线程是否可用中断,也就是是否取消正在执行但还没有执行完成的任务,如果该值为true,则可以取消;
  2. isCancelled,表示任务是否被取消成功,如果任务在正常完成之前被取消,则返回true;
  3. isDone,表示任务是否已经完成,已经完成返回true;完成可能是由于正常执行完成、抛出异常或取消,在这些情况下,该接口都返回true;也就是,在调用cancel方法返回之后,该方法的调用将返回true;
  1. get,获取线程的执行结果,如果任务尚未执行完成,则该接口将会处于阻塞,直到线程执行完成;而该方法有可能会抛出以下几个异常:
    • CancellationException - if the computation was cancelled
    • ExecutionException - if the computation threw an exception
    • InterruptedException - if the current thread was interrupted while waiting
  2. get,有参数的get方法,该方法也是用于获取执行结果,如果在指定的时间内没有获取到结果,则会抛出超时异常:
    • TimeoutException - if the wait timed out

也就是说,通过Future我们可以判断任务是否执行完成,取消任务,获取任务执行完成之后的结果。而Future是一个接口,我们来看以下它的实现类FutureTask。

3. FutureTask

我们先来看一下FutureTask的实现:

public class FutureTask<V> implements RunnableFuture<V> {
    public FutureTask(Callable<V> callable)
    public FutureTask(Runnable runnable, V result)
}

我们可以看到,FutureTask继承自RunnableFuture,我们再来看下RunnableFuture:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

可以看到,RunnableFuture同时继承了Runnable接口和Future接口,所以说它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

4. CompletableFuture

  虽然Future提供了异步执行的能力,但Future对于结果的获取却不方便,只能通过阻塞或者轮询的方式得到结果,所以在JDK 8.0之后引入了CompletableFuture这个类来优化这些问题,CompletableFuture大概有50多个方法,功能十分强大,比如说 将多个异步计算合并为一个,后面的依赖于前面执行的结果等,我们先来看一下它的继承结构:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 

CompletableFuture算是Future的增强类,因为该类在实现Future的同时又实现了CompletionStage 这个接口,而 CompletionStage 接口表示的是阶段,表示某个阶段完成之后要做什么,该接口提供了多种方法,接下来我们来学习下。

4.1 生成 CompletableFuture

我们可用借助CompletableFuture 的几个静态方法来生成 CompletableFuture 对象:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor)

以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

也可以通过completedFuture返回一个已经计算好的CompletedFuture的对象:

public static <U> CompletableFuture<U> completedFuture(U value)
4.2 CompletionStage 接口

  我们来看下CompletionStage 的接口。CompletionStage 的接口虽然很多,但相似的方法一般都会分为三类,一个是使用当前线程执行,一个是使用共享的线程池,另一个是使用自定义的(参数传递)Executor来执行,类似于下面这种:

thenXXX() :使用当前线程继续执行接下来的内容
thenXXXAsync() :使用共享线程池执行接下来的内容
thenXXXAsync(with Executor) :使用参数中提供的Executor来执行接下来的内容
    1. thenApply,返回一个新的CompletionStage,当该阶段执行完成后,该阶段的结果作为参数进行下一步执行,比如将CompletableFuture的返回结果String类型的结果转换为Integer:
completableFuture.thenApply(s -> Integer.parseInt(s));
    1. thenAccept,对结果的消费处理,只能消费一次,该方法没有返回值:
completableFuture.thenAccept(s -> System.out.println("->" + s));
    1. thenRun,该阶段执行完成之后,执行下一个任务,比如说如果你想在CompletableFuture完成之后执行一个Runnable任务,就可以使用该操作,一旦CompletableFuture完成,就会执行指定的Runnable任务,下面是使用示例:
completableFuture.thenRun(() -> System.out.println("then running"));
    1. thenCombine,对两个CompletableFuture的结果进行组合然后返回,比如将当前CompletableFuture 的结果和等待的 CompletableFuture 的结果进行一些组合然后返回:
completableFuture.thenCombine(completableFuture1, (s1, s2) -> "Combine:" + s1 + " : " + s2);
    1. thenAcceptBoth,等待两个CompletableFuture结束,然后消费结果,所以该方法自然没有返回值。只有当两个任务都完成之后才执行:
completableFuture.thenAcceptBoth(completableFuture, 
    (s1, s2) -> System.out.println("Consume:" + s1 + ":" + s2));
    1. runAfterBoth,相比于AcceptBoth,runAfterBoth虽然也会等待两个CompletableFuture都结束再运行,但是不再依赖两个CompletableFuture的运行结果:
completableFuture.runAfterBoth(stringCompletableFutureV1, 
    (Runnable) () -> System.out.println("do some jobs here"));
    1. applyToEither,两个CompletableFuture 有竞争关系,哪个CompletableFuture先结束,就会对哪个的返回值做处理,然后第二个CompletableFuture的结果将不会再处理:
completableFuture.applyToEither(stringCompletableFutureV1, 
    (Function<String, Integer>) Integer::parseInt);
    1. acceptEither,同样,对两个CompletableFuture 先结束的进行消费:
completableFuture.acceptEither(stringCompletableFutureV1, 
    (Consumer<String>) s -> System.out.println("->" + s));
    1. runAfterEither,同样,对两个CompletableFuture先结束的 再执行新的任务:
completableFuture.runAfterEither(stringCompletableFutureV1, 
    (Runnable) () -> System.out.println("other job"));
    1. thenCompose,表示使用CompletableFuture的结果来产生一个全新的CompletableFuture,then的含义是等待当前的CompletableFuture运行结束,然后再运行我们指定的Supplier:
completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ":" + s));
    1. exceptionally,对当前的CompletableFuture抛出的异常进行一些处理,类似于fallback,比如说返回异常内容来代替当前抛出的异常,当然你也可以在该方法里面直接抛出当前CompletableFuture抛出的异常:
completableFuture.exceptionally(throwable -> "fallback");
    1. whenComplete,当前CompletableFuture将会把运行结果作为参数传递给指定的function,然后我们可以通过参数来判断当前CompletableFuture是否成功完成还是抛出了异常,然后进行后续处理:
completableFuture.whenComplete((s, throwable) -> System.out.println(s));
    1. handle,和whenComplete相比,该类型不仅可以获取到当前CompletableFuture是成功了还是抛出了异常,还可以对结果进行转换(如果当前CompletableFuture成功完成了):
completableFuture.handle((s, throwable) -> {
    if (throwable == null) {
        return Integer.parseInt(s);
    }
    return -1;
});
4.3 CompletableFuture实现类

  CompletableFuture中,除了实现上述方法以及Future接口的实现方法之外,还有一些方法,比如除了常规的阻塞get及超时get方法之外,还有一个getNow方法:

public T getNow(T valueIfAbsent) {
    Object r;
    return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

result字段代表任务的执行结果,所以首先判断是否为null,为null则表示任务还没有执行结束,直接返回参数中传递的默认值;如果result不为null,表示说明任务已经执行结束,然后直接返回;如果要了解源码的执行流程,可以参考本章后面引用的链接地址。

另外两个常用的方法,allOf,anyOf方法,在CompletionStage接口的方法中操作的CompletionStage对象,要么是一个,要么是两个,而这两个静态方法可以组合多个对象。

  1. allOf,allOf可以接收多个CompletableFuture参数,该方法会等待所有的CompletableFuture都执行完成之后组合完成,再执行接下来的操作;
  2. anyOf,anyOf和allOf类似,也是组合多个,但只要有一个CompletableFuture完成之后便执行后续的操作;

这部分内容参考自:
一字马胡 - 深度学习Java Future(一)
一字马胡 - 深度学习Java Future(二)

3. 运行示例

接下来,我们通过几个简单的例子来看一下这几个接口的使用。

3.1 Callable和Future

我们先来看一下Callable和Future的使用:

public class CallableTest {
    public static void main(String[] args) {
        try {
            callable();
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void callable() throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<String> future = executor.submit(new Callable<String>() {
            public String call() throws InterruptedException {
                System.out.println("child thread start --");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("child thread end --");
                return "hello callable";
            }
        });
        executor.shutdown();
        String result = future.get();
        System.out.println("return value: " + result);
    }
}

其中,匿名类Callable可以替换为lambda表达式:

public static void callable() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newCachedThreadPool();
    Future<String> future = executor.submit(() -> {
        System.out.println("child thread start --");
        TimeUnit.SECONDS.sleep(3);
        System.out.println("child thread end --");
        return "hello callable";
    });
    executor.shutdown();
    String result = future.get();
    System.out.println("return value: " + result);
}

output:

child thread start --
child thread end --
return value:hello callable
3.2 Callable和FutureTask

至于FutureTask,我们可以通过FutureTask的构造方法来构造对象,然后通过ExecutorService的execute方法来执行:

public static void callable() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newCachedThreadPool();
    FutureTask<String> future = new FutureTask<>(() -> {
        System.out.println("child thread start --");
        TimeUnit.SECONDS.sleep(3);
        System.out.println("child thread end --");
        return "hello callable";
    });
    executor.execute(future);
    executor.shutdown();
    String result = future.get();
    System.out.println("return value: " + result);
}

当然也可以借助Thread类而不是ExecutorService来实现:

public static void callable() throws ExecutionException, InterruptedException {
    FutureTask<String> future = new FutureTask<>(() -> {
        System.out.println("child thread start --");
        TimeUnit.SECONDS.sleep(3);
        System.out.println("child thread end --");
        return "hello callable";
    });
    new Thread(future).start();
    String result = future.get();
    System.out.println("return value: " + result);
}

本文参考自:
海子-Java并发编程:Callable、Future和FutureTask
https://docs.oracle.com/javase/8/docs/api/
IBM - 通过实例理解 JDK8 的 CompletableFuture
鸟窝-Java CompletableFuture 详解
《Java并发编程实战》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354