1. 手动创建多线程
1.1 Thread和Runnable
Thread和Runnable大多数读者都知道,请跳过。
使用Thread:
public class ThreadDemo {
public static void main(String[] args) {
Thread threadOne = new ThreadExample("线程1");
Thread threadTwo = new ThreadExample("线程2");
threadOne.start();
threadTwo.start();
}
}
class ThreadExample extends Thread {
private String name;
public ThreadExample(String name) {
this.name = name;
}
@Override
public void run() {
System.out.printf("线程{%s}输出\n", name);
}
}
使用Runnable:
public class RunnableDemo {
public static void main(String[] args) {
Thread threadOne = new Thread(new RunnableExample("线程1"));
Thread threadTwo = new Thread(new RunnableExample("线程2"));
threadOne.start();
threadTwo.start();
}
}
class RunnableExample implements Runnable {
private String name;
public RunnableExample(String name) {
this.name = name;
}
@Override
public void run() {
System.out.printf("线程{%s}输出%s", name, System.lineSeparator());
}
}
一个实现接口,一个继承类,直接使用的话这两个没有太大的区别。有的地方会说一个Runnable对象可以让多个Thread复用,实际上只是可以共用一些参数设置而已。
实际比较有用的是下面的线程池等,只接受Runnable(或者Callable)类型的参数,而不能用Thread类型的参数。
1.2 Callable和Future、FutureTask
对于子线程,我们可能会有两种需求:
- 获取子线程运行结果
- 获取子线程运行状态(成功、失败、异常)
Thread不满足这两个要求,Runnable可以获取状态但不能获取结果,于是出现了Callable。Callable配合Future使用可以获得子线程执行结果。
public class FutureDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Runnable one = new RunnableExample("线程1");
Callable two = new CallableExample("线程2");
ThreadPool.pool.submit(one);
// 这里get方法会阻塞住
Future future = ThreadPool.pool.submit(two);
System.out.println(future.get());
ThreadPool.pool.shutdown();
}
}
class CallableExample<T> implements Callable<T> {
private T name;
public CallableExample(T name) {
this.name = name;
}
@Override
public T call() {
System.out.printf("线程{%s}输出%s", name, System.lineSeparator());
return name;
}
}
class ThreadPool {
private ThreadPool() {
}
public static final ThreadPoolExecutor pool = new ThreadPoolExecutor(
5,
10,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
r -> new Thread(r),
new ThreadPoolExecutor.DiscardOldestPolicy());
}
Future.get()是一个阻塞方法。为了不阻塞,我们将多个Future放到一个List中,然后去遍历,根据isDone()方法判断子线程已经执行完后,再get。
Future还有一个FutureTask,接受Callable作为参数。
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable one = new CallableExample("线程1");
Callable two = new CallableExample("线程2");
FutureTask<CallableExample> task = new FutureTask<>(two);
ThreadPool.pool.submit(one);
ThreadPool.pool.submit(task);
System.out.println(task.get());
ThreadPool.pool.shutdown();
}
}
在不方便获取submit的返回值时很有效。
2. 线程池
线程创建、销毁消耗资源,线程上下文切换也消耗资源,并且不控制线程数量容易OOM,所以有了线程池。可以使用以下四个线程池:
- Executors.newCachedThreadPool();
- Executors.newFixedThreadPool(3);
- Executors.newScheduledThreadPool(3);
- Executors.newSingleThreadExecutor();
也可以自定义:
ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1),
new ThreadFactoryBuilder().setNameFormat("HomePageCardFeatureQueryThreadPool-%d").setDaemon(true).build(),
new ThreadPoolExecutor.DiscardPolicy());
其中,ThreadPoolExecutor.DiscardPolicy() 是线程池线程数达到最大且队列满了之后的处理策略,ThreadFactoryBuilder() 是产生线程的工程方法,这是google的guava包里的,我们也可以自己实现java.util.concurrent包里面的ThreadFactory。一个简单的例子:
public class MyThreadFactory implements ThreadFactory {
private int counter;
private String name;
private List<String> stats;
public MyThreadFactory(String name) {
counter = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(runnable, name + "-Thread_" + counter);
counter++;
return t;
}
}
3. 插一个CompletionService
CompletionService按照线程执行完的顺序返回结果,不需要遍历Future列表
public class CompletionServiceDemo {
public static void main(String[] args) {
try {
int taskCount = 10;
CompletionService<Integer> completionService = new ExecutorCompletionService<>(ThreadPool.pool);
// 方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
// List<Future<Integer>> futureList = new ArrayList<>();
// for (int i = 0; i < taskCount; i++) {
// futureList.add(completionService.submit(new demo.CallableExample<>(i + 1)));
// }
// for (Future<Integer> future : futureList) {
// //每个线程按顺序在这里阻塞,等待该任务执行完毕
// Integer result = future.get();
// System.out.printf("线程获取到执行结果{%2d}\n", result);
// }
// 方法2.使用CompletionService的内部阻塞队列
for (int i = 0; i < taskCount; i++) {
completionService.submit(new CallableExample<>(i + 1));
}
for (int i = 0; i < taskCount; i++) {
//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
Integer result = completionService.take().get();
System.out.printf("线程获取到执行结果{%2d}%s", result, System.lineSeparator());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭线程池
ThreadPool.pool.shutdown();
}
}
}
4. jdk8提供的强大的CompletableFuture
public class CompletableFutureDemo {
public static void main(String[] args) {
int taskCount = 10;
CompletableFuture[] futures = new CompletableFuture[taskCount];
List<Integer> res = new CopyOnWriteArrayList<>();
for (int i = 0; i < taskCount; i++) {
int count = i;
futures[i] = CompletableFuture.supplyAsync(
() -> {
System.out.printf("线程获取到执行结果{%2d}%s", count, System.lineSeparator());
return count;
},
ThreadPool.pool).thenAccept(x -> res.add(x));
}
CompletableFuture.allOf(futures).orTimeout(1000, TimeUnit.SECONDS).join();
res.stream().forEach(System.out::println);
ThreadPool.pool.shutdown();
}
jdk1.8的CompletableFuture没有timeout怎么办(jdk9才加上这个功能)?可以使用 get()方法代替,也可以加入一个定时线程。
public class CompletableFuture8Demo {
public static void main(String[] args) throws Exception {
ScheduledThreadPoolExecutor delayerThreadPool = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
});
// 用定时任务抛出一个时间到的异常
CompletableFuture<Integer> timeoutFuture = new CompletableFuture<>();
delayerThreadPool.schedule(() -> {
timeoutFuture.completeExceptionally(new TimeoutException("时间到"));
}, 1, TimeUnit.SECONDS);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(1000);
System.out.println("1s");
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return 100;
})
.exceptionally(e -> 50);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(2000);
System.out.println("2s");
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return 100;
})
.exceptionally(e -> 50);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(3000);
System.out.println("3s");
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return 100;
})
.exceptionally(e -> 50);
CompletableFuture<Integer> res = future
.thenCombineAsync(future2, (i, j) -> i + j)
.thenCombineAsync(future3, (i, j) -> i + j)
.applyToEither(timeoutFuture, Function.identity());
try {
System.out.println(res.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
delayerThreadPool.shutdownNow();
}
}
}