java多线程使用——从Thread、Runnable到CompletableFuture

1. 手动创建多线程

1.1 Thread和Runnable

Thread和Runnable大多数读者都知道,请跳过。
使用Thread:

public class ThreadDemo {

    public static void main(String[] args) {
        Thread threadOne = new ThreadExample("线程1");
        Thread threadTwo = new ThreadExample("线程2");
        threadOne.start();
        threadTwo.start();
    }
}

class ThreadExample extends Thread {

    private String name;

    public ThreadExample(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.printf("线程{%s}输出\n", name);
    }
}

使用Runnable:

public class RunnableDemo {

    public static void main(String[] args) {
        Thread threadOne = new Thread(new RunnableExample("线程1"));
        Thread threadTwo = new Thread(new RunnableExample("线程2"));
        threadOne.start();
        threadTwo.start();
    }
}

class RunnableExample implements Runnable {

    private String name;

    public RunnableExample(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.printf("线程{%s}输出%s", name, System.lineSeparator());
    }
}

一个实现接口,一个继承类,直接使用的话这两个没有太大的区别。有的地方会说一个Runnable对象可以让多个Thread复用,实际上只是可以共用一些参数设置而已。
实际比较有用的是下面的线程池等,只接受Runnable(或者Callable)类型的参数,而不能用Thread类型的参数。

1.2 Callable和Future、FutureTask

对于子线程,我们可能会有两种需求:

  1. 获取子线程运行结果
  2. 获取子线程运行状态(成功、失败、异常)
    Thread不满足这两个要求,Runnable可以获取状态但不能获取结果,于是出现了Callable。Callable配合Future使用可以获得子线程执行结果。
public class FutureDemo {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Runnable one = new RunnableExample("线程1");
        Callable two = new CallableExample("线程2");
        ThreadPool.pool.submit(one);
        // 这里get方法会阻塞住
        Future future = ThreadPool.pool.submit(two);
        System.out.println(future.get());
        ThreadPool.pool.shutdown();
    }
}

class CallableExample<T> implements Callable<T> {

    private T name;

    public CallableExample(T name) {
        this.name = name;
    }

    @Override
    public T call() {
        System.out.printf("线程{%s}输出%s", name, System.lineSeparator());
        return name;
    }
}

class ThreadPool {

    private ThreadPool() {
    }

    public static final ThreadPoolExecutor pool = new ThreadPoolExecutor(
            5,
            10,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            r -> new Thread(r),
            new ThreadPoolExecutor.DiscardOldestPolicy());
}

Future.get()是一个阻塞方法。为了不阻塞,我们将多个Future放到一个List中,然后去遍历,根据isDone()方法判断子线程已经执行完后,再get。
Future还有一个FutureTask,接受Callable作为参数。

public class FutureTaskDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable one = new CallableExample("线程1");
        Callable two = new CallableExample("线程2");
        FutureTask<CallableExample> task = new FutureTask<>(two);
        ThreadPool.pool.submit(one);
        ThreadPool.pool.submit(task);
        System.out.println(task.get());
        ThreadPool.pool.shutdown();
    }
}

在不方便获取submit的返回值时很有效。

2. 线程池

线程创建、销毁消耗资源,线程上下文切换也消耗资源,并且不控制线程数量容易OOM,所以有了线程池。可以使用以下四个线程池:

  • Executors.newCachedThreadPool();
  • Executors.newFixedThreadPool(3);
  • Executors.newScheduledThreadPool(3);
  • Executors.newSingleThreadExecutor();
    也可以自定义:
ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(2, 2, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(1),
            new ThreadFactoryBuilder().setNameFormat("HomePageCardFeatureQueryThreadPool-%d").setDaemon(true).build(),
            new ThreadPoolExecutor.DiscardPolicy());

其中,ThreadPoolExecutor.DiscardPolicy() 是线程池线程数达到最大且队列满了之后的处理策略,ThreadFactoryBuilder() 是产生线程的工程方法,这是google的guava包里的,我们也可以自己实现java.util.concurrent包里面的ThreadFactory。一个简单的例子:

    public class MyThreadFactory implements ThreadFactory {
        private int counter;
        private String name;
        private List<String> stats;

        public MyThreadFactory(String name) {
            counter = 1;
            this.name = name;
        }

        @Override
        public Thread newThread(Runnable runnable) {
            Thread t = new Thread(runnable, name + "-Thread_" + counter);
            counter++;
            return t;
        }
    }

3. 插一个CompletionService

CompletionService按照线程执行完的顺序返回结果,不需要遍历Future列表

public class CompletionServiceDemo {

    public static void main(String[] args) {
        try {
            int taskCount = 10;
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(ThreadPool.pool);

            // 方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
//          List<Future<Integer>> futureList = new ArrayList<>();
//          for (int i = 0; i < taskCount; i++) {
//              futureList.add(completionService.submit(new demo.CallableExample<>(i + 1)));
//          }
//          for (Future<Integer> future : futureList) {
//              //每个线程按顺序在这里阻塞,等待该任务执行完毕
//              Integer result = future.get();
//              System.out.printf("线程获取到执行结果{%2d}\n", result);
//          }

            // 方法2.使用CompletionService的内部阻塞队列
            for (int i = 0; i < taskCount; i++) {
                completionService.submit(new CallableExample<>(i + 1));
            }
            for (int i = 0; i < taskCount; i++) {
                //采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
                Integer result = completionService.take().get();
                System.out.printf("线程获取到执行结果{%2d}%s", result, System.lineSeparator());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            ThreadPool.pool.shutdown();
        }
    }
}

4. jdk8提供的强大的CompletableFuture

public class CompletableFutureDemo {

    public static void main(String[] args) {
        int taskCount = 10;
        CompletableFuture[] futures = new CompletableFuture[taskCount];
        List<Integer> res = new CopyOnWriteArrayList<>();
        for (int i = 0; i < taskCount; i++) {
            int count = i;
            futures[i] = CompletableFuture.supplyAsync(
                    () -> {
                        System.out.printf("线程获取到执行结果{%2d}%s", count, System.lineSeparator());
                        return count;
                    },
                    ThreadPool.pool).thenAccept(x -> res.add(x));
        }
        CompletableFuture.allOf(futures).orTimeout(1000, TimeUnit.SECONDS).join();
        res.stream().forEach(System.out::println);
        ThreadPool.pool.shutdown();
    }

jdk1.8的CompletableFuture没有timeout怎么办(jdk9才加上这个功能)?可以使用 get()方法代替,也可以加入一个定时线程。

public class CompletableFuture8Demo {
    public static void main(String[] args) throws Exception {
        ScheduledThreadPoolExecutor delayerThreadPool = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });

        // 用定时任务抛出一个时间到的异常
        CompletableFuture<Integer> timeoutFuture = new CompletableFuture<>();
        delayerThreadPool.schedule(() -> {
            timeoutFuture.completeExceptionally(new TimeoutException("时间到"));
        }, 1, TimeUnit.SECONDS);

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(1000);
                        System.out.println("1s");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e.getMessage());
                    }
                    return 100;
                })
                .exceptionally(e -> 50);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(2000);
                        System.out.println("2s");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e.getMessage());
                    }
                    return 100;
                })
                .exceptionally(e -> 50);
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(3000);
                        System.out.println("3s");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e.getMessage());
                    }
                    return 100;
                })
                .exceptionally(e -> 50);
        CompletableFuture<Integer> res = future
                .thenCombineAsync(future2, (i, j) -> i + j)
                .thenCombineAsync(future3, (i, j) -> i + j)
                .applyToEither(timeoutFuture, Function.identity());
        try {
            System.out.println(res.get());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            delayerThreadPool.shutdownNow();
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容