java多线程使用——从Thread、Runnable到CompletableFuture

1. 手动创建多线程

1.1 Thread和Runnable

Thread和Runnable大多数读者都知道,请跳过。
使用Thread:

public class ThreadDemo {

    public static void main(String[] args) {
        Thread threadOne = new ThreadExample("线程1");
        Thread threadTwo = new ThreadExample("线程2");
        threadOne.start();
        threadTwo.start();
    }
}

class ThreadExample extends Thread {

    private String name;

    public ThreadExample(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.printf("线程{%s}输出\n", name);
    }
}

使用Runnable:

public class RunnableDemo {

    public static void main(String[] args) {
        Thread threadOne = new Thread(new RunnableExample("线程1"));
        Thread threadTwo = new Thread(new RunnableExample("线程2"));
        threadOne.start();
        threadTwo.start();
    }
}

class RunnableExample implements Runnable {

    private String name;

    public RunnableExample(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.printf("线程{%s}输出%s", name, System.lineSeparator());
    }
}

一个实现接口,一个继承类,直接使用的话这两个没有太大的区别。有的地方会说一个Runnable对象可以让多个Thread复用,实际上只是可以共用一些参数设置而已。
实际比较有用的是下面的线程池等,只接受Runnable(或者Callable)类型的参数,而不能用Thread类型的参数。

1.2 Callable和Future、FutureTask

对于子线程,我们可能会有两种需求:

  1. 获取子线程运行结果
  2. 获取子线程运行状态(成功、失败、异常)
    Thread不满足这两个要求,Runnable可以获取状态但不能获取结果,于是出现了Callable。Callable配合Future使用可以获得子线程执行结果。
public class FutureDemo {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Runnable one = new RunnableExample("线程1");
        Callable two = new CallableExample("线程2");
        ThreadPool.pool.submit(one);
        // 这里get方法会阻塞住
        Future future = ThreadPool.pool.submit(two);
        System.out.println(future.get());
        ThreadPool.pool.shutdown();
    }
}

class CallableExample<T> implements Callable<T> {

    private T name;

    public CallableExample(T name) {
        this.name = name;
    }

    @Override
    public T call() {
        System.out.printf("线程{%s}输出%s", name, System.lineSeparator());
        return name;
    }
}

class ThreadPool {

    private ThreadPool() {
    }

    public static final ThreadPoolExecutor pool = new ThreadPoolExecutor(
            5,
            10,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            r -> new Thread(r),
            new ThreadPoolExecutor.DiscardOldestPolicy());
}

Future.get()是一个阻塞方法。为了不阻塞,我们将多个Future放到一个List中,然后去遍历,根据isDone()方法判断子线程已经执行完后,再get。
Future还有一个FutureTask,接受Callable作为参数。

public class FutureTaskDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable one = new CallableExample("线程1");
        Callable two = new CallableExample("线程2");
        FutureTask<CallableExample> task = new FutureTask<>(two);
        ThreadPool.pool.submit(one);
        ThreadPool.pool.submit(task);
        System.out.println(task.get());
        ThreadPool.pool.shutdown();
    }
}

在不方便获取submit的返回值时很有效。

2. 线程池

线程创建、销毁消耗资源,线程上下文切换也消耗资源,并且不控制线程数量容易OOM,所以有了线程池。可以使用以下四个线程池:

  • Executors.newCachedThreadPool();
  • Executors.newFixedThreadPool(3);
  • Executors.newScheduledThreadPool(3);
  • Executors.newSingleThreadExecutor();
    也可以自定义:
ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(2, 2, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(1),
            new ThreadFactoryBuilder().setNameFormat("HomePageCardFeatureQueryThreadPool-%d").setDaemon(true).build(),
            new ThreadPoolExecutor.DiscardPolicy());

其中,ThreadPoolExecutor.DiscardPolicy() 是线程池线程数达到最大且队列满了之后的处理策略,ThreadFactoryBuilder() 是产生线程的工程方法,这是google的guava包里的,我们也可以自己实现java.util.concurrent包里面的ThreadFactory。一个简单的例子:

    public class MyThreadFactory implements ThreadFactory {
        private int counter;
        private String name;
        private List<String> stats;

        public MyThreadFactory(String name) {
            counter = 1;
            this.name = name;
        }

        @Override
        public Thread newThread(Runnable runnable) {
            Thread t = new Thread(runnable, name + "-Thread_" + counter);
            counter++;
            return t;
        }
    }

3. 插一个CompletionService

CompletionService按照线程执行完的顺序返回结果,不需要遍历Future列表

public class CompletionServiceDemo {

    public static void main(String[] args) {
        try {
            int taskCount = 10;
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(ThreadPool.pool);

            // 方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
//          List<Future<Integer>> futureList = new ArrayList<>();
//          for (int i = 0; i < taskCount; i++) {
//              futureList.add(completionService.submit(new demo.CallableExample<>(i + 1)));
//          }
//          for (Future<Integer> future : futureList) {
//              //每个线程按顺序在这里阻塞,等待该任务执行完毕
//              Integer result = future.get();
//              System.out.printf("线程获取到执行结果{%2d}\n", result);
//          }

            // 方法2.使用CompletionService的内部阻塞队列
            for (int i = 0; i < taskCount; i++) {
                completionService.submit(new CallableExample<>(i + 1));
            }
            for (int i = 0; i < taskCount; i++) {
                //采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
                Integer result = completionService.take().get();
                System.out.printf("线程获取到执行结果{%2d}%s", result, System.lineSeparator());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            ThreadPool.pool.shutdown();
        }
    }
}

4. jdk8提供的强大的CompletableFuture

public class CompletableFutureDemo {

    public static void main(String[] args) {
        int taskCount = 10;
        CompletableFuture[] futures = new CompletableFuture[taskCount];
        List<Integer> res = new CopyOnWriteArrayList<>();
        for (int i = 0; i < taskCount; i++) {
            int count = i;
            futures[i] = CompletableFuture.supplyAsync(
                    () -> {
                        System.out.printf("线程获取到执行结果{%2d}%s", count, System.lineSeparator());
                        return count;
                    },
                    ThreadPool.pool).thenAccept(x -> res.add(x));
        }
        CompletableFuture.allOf(futures).orTimeout(1000, TimeUnit.SECONDS).join();
        res.stream().forEach(System.out::println);
        ThreadPool.pool.shutdown();
    }

jdk1.8的CompletableFuture没有timeout怎么办(jdk9才加上这个功能)?可以使用 get()方法代替,也可以加入一个定时线程。

public class CompletableFuture8Demo {
    public static void main(String[] args) throws Exception {
        ScheduledThreadPoolExecutor delayerThreadPool = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });

        // 用定时任务抛出一个时间到的异常
        CompletableFuture<Integer> timeoutFuture = new CompletableFuture<>();
        delayerThreadPool.schedule(() -> {
            timeoutFuture.completeExceptionally(new TimeoutException("时间到"));
        }, 1, TimeUnit.SECONDS);

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(1000);
                        System.out.println("1s");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e.getMessage());
                    }
                    return 100;
                })
                .exceptionally(e -> 50);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(2000);
                        System.out.println("2s");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e.getMessage());
                    }
                    return 100;
                })
                .exceptionally(e -> 50);
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(3000);
                        System.out.println("3s");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e.getMessage());
                    }
                    return 100;
                })
                .exceptionally(e -> 50);
        CompletableFuture<Integer> res = future
                .thenCombineAsync(future2, (i, j) -> i + j)
                .thenCombineAsync(future3, (i, j) -> i + j)
                .applyToEither(timeoutFuture, Function.identity());
        try {
            System.out.println(res.get());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            delayerThreadPool.shutdownNow();
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容