Java的线程既是工作单元,也是执行机制。JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。
1.Executor框架简介
①Executor框架的两级调度模型
java.lang.Thread被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。
②Executor框架的结构与成员
1)Executor框架的结构
- 任务。包括被执行任务需要实现的接口Runnable和Callable接口。
- 任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
- 一步计算的结果。包括接口Future和实现Future接口的FutureTask类。
- Executor:是一个接口,是Executor框架的基础。
- ThreadPoolExecutor:是线程池的核心实现类,用来执行被提交的任务。
- ScheduledThreadPoolExecutor:是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。比Timer更灵活,功能更强大。
- Future接口和实现接口的FutureTask类:代表异步计算的结果。
- Runnable接口和Callable接口的实现类:都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。
2)Executor框架的成员
-
ThreadPoolExecutor
newFixedThreadPool:适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
newSingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newCachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
-
ScheduleThreadPoolExecutor
newScheduledThreadPool:包含若干个线程的ScheduledThreadPoolExecutor。适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
newSingleThreadScheduledExecutor:只包含一个线程的ScheduledThreadPoolExecutor。适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }
-
Future
<T> Future<T> submit(Callable<T> task) <T> Future<T> submit(Runnable task, T result) Future<?> submit(Runnable task)
-
Runnable和Callable
public static Callable<Object> callable(Runnable task) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<Object>(task, null); } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
2.ScheduleThreadPoolExecutor详解
继承自ThreadPoolExecutor。主要用来在给定的延迟后执行运行任务,或者定期执行任务。功能与Timer类似,但功能更强大、更灵活。Timer对应的是单个后台线程,而ScheduleThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。
①ScheduleThreadPoolExecutor的运行机制
当调用scheduleAtFixedRate方法或者scheduleWithFixedDelay方法时,会向ScheduleThreadPoolExecutor的DelayedWorkQueue添加了一个实现了RunnableScheduledFuture接口的ScheduledFutureTask。
线程池中的线程从DelayedWorkQueue中获取ScheduledFutureTask,然后执行任务。
ScheduleThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下的修改。
- 使用DelayedWorkQueue作为任务队列。
- 获取任务的方式不同。
- 执行周期任务后,增加了额外的处理。
②ScheduleThreadPoolExecutor的实现
ScheduledFutureTask主要包含3个成员变量:
- long time;//表示这个任务将要被执行的具体时间。
- long sequenceNumber;//表示这个任务被添加到ScheduleThreadPoolExecutor中的序号。
- long period;//表示任务执行的间隔周期。
- 从DelayWorkQueue中获取已到期的ScheduleFutureTask(DelayWorkQueue.take())。
- 线程1执行这个ScheduleFutureTask。
- 线程1修改ScheduleFutureTask的time变量为下次将要执行的时间。
- 线程1把这个修改time之后的ScheduleFutureTask放回DelayWorkQueue中(DelayWorkQueue.add())。
public RunnableScheduledFuture<?> take() throws InterruptedException {
//获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//获取周期任务
RunnableScheduledFuture<?> first = queue[0];
//任务数组为空
if (first == null)
available.await();//等待
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);//获取头部元素,将最后一个元素放到第一位,并自上而下添加到其堆排序点。
//头部元素的time时间比当前时间大
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//到condition中等待到time时间
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();//释放锁
}
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();//1 获取锁
try {
int i = size;
if (i >= queue.length)
grow();//调整堆数组大小
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);//2.1 在底部添加到堆排序点
}
if (queue[0] == e) {
leader = null;
available.signal();//2.2 通知
}
} finally {
lock.unlock();//3 释放锁
}
return true;
}
3.FutureTask详解
①FutureTask简介
实现了Future接口和Runnable接口,可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。
FutureTask.get():未启动和已启动状态时调用,会导致线程阻塞。已完成状态时,调用线程立即返回结果或抛出异常。
FutureTask.cancel():未启动状态时,调用将导致此任务永远不会被执行。已启动状态时,cancel(true)将以中断执行此任务线程的方式来视图停止任务;cancel(false)将不会对正在执行此任务的线程产生任何影响(让正在执行的任务运行完成)。已完成状态时,执行cancel返回false。
②FutureTask的使用
当一个线程需要等待另一个线程把某个任务执行完成后才能继续执行,此时可以使用FutureTask。
假设有多个线程执行若干个任务,每个任务最多只能被执行一次,当多个线程视图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完成后才能继续执行。示例代码:
private final ConcurrentHashMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();//每个任务只执行一次,多个线程可以获取到执行的结果
private String executionTask(final String taskName) throws ExecutionException, InterruptedException {
while (true) {
Future<String> future = taskCache.get(taskName); //1.1 执行完1.3后执行 2.1
if (future == null) {
FutureTask<String> futureTask = new FutureTask<>(() -> taskName);//1.2
//如果存在taskName,不更改value,返回旧值,如果不存在,put
future = taskCache.putIfAbsent(taskName, futureTask);//1.3
if (future == null) {//put成功了
future = futureTask;
futureTask.run();//1.4执行任务
}
}
try {
return future.get();//1.5 2.2
} catch (CancellationException e) {
taskCache.remove(taskName, future);//当taskName和future有映射关系的时候,才移除
}
}
}
当两个线程试图同时执行同一个任务时,如果Thread1执行了1.3后Thread2执行2.1,那么接下来Thread2将在2.2等待,知道Thread1执行完成1.4后Thread2才能从2.2返回。