1. 概述
本文首先阐述Java线程池的运行机制、参数配置,随后指出其存在何种“缺陷”,最后基于现有机制做策略优化,定制一种更合理、有效的线程池。
2. Java线程池机制
Java可以通过Executors提供的工厂方法创建线程池,也可以通过ThreadPoolExecutor直接创建,本质上二者并无差别。
ThreadPoolExecutor最完整的构造函数如下:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler)
下面逐一来看各参数含义。
2.1 corePoolSize\maximumPoolSize
- corePoolSize:线程池中核心线程数量。
- maximumPoolSize:线程池同时允许存在的最大线程数量。
内部处理逻辑如下:
- 当线程池中工作线程数小于corePoolSize,创建新的工作线程来执行该任务,不管线程池中是否存在空闲线程。
- 如果线程池中工作线程数达到corePoolSize,新任务尝试放入队列,入队成功的任务将等待工作线程空闲时调度。
2.1 如果队列满并且线程数小于maximumPoolSize,创建新的线程执行该任务(注意:队列中的任务继续排序)。
2.2 如果队列满且线程数超过maximumPoolSize,拒绝该任务。
2.2 keepAliveTime
当线程池中工作线程数大于corePoolSize,并且线程空闲时间超过keepAliveTime,则这些线程将被终止。同样,可以将这种策略应用到核心线程,通过调用allowCoreThreadTimeout来实现。
2.3 BlockingQueue
任务等待队列,用于缓存暂时无法执行的任务。分为如下三种堵塞队列:
- 直接递交。如SynchronousQueue,该策略直接将任务直接交给工作线程。如果当前没有空闲工作线程,创建新线程。这种策略最好是配合unbounded线程数来使用,从而避免任务被拒绝。但当任务生产速度大于消费速度,将导致线程数不断的增加。
- 无界队列。如LinkedBlockingQueue,当工作的线程数达到核心线程数时,新的任务被放在队列上。因此,永远不会有大于corePoolSize的线程被创建,maximumPoolSize参数失效。这种策略比较适合所有的任务都不相互依赖,独立执行。但是当任务处理速度小于任务进入速度的时候会引起队列的无限膨胀。
- 有界队列。如ArrayBlockingQueue,按前面描述的corePoolSize、maximumPoolSize、BlockingQueue处理逻辑处理。队列长度和maximumPoolSize两个值会相互影响:
- 长队列 + 小maximumPoolSize。会减少CPU的使用、操作系统资源、上下文切换的消耗,但是会降低吞吐量,如果任务被频繁的阻塞如IO线程,系统其实可以调度更多的线程。
- 短队列 + 大maximumPoolSize。CPU更忙,但会增加线程调度的消耗.
总结一下,IO密集型可以考虑多些线程来平衡CPU的使用,CPU密集型可以考虑少些线程减少线程调度的消耗。
2.4 ThreadFactory
线程池中的工作线程通过ThreadFactory来创建的,如果没有指定,默认为Executors#defaultThreadFactory。这个时候创建的线程将都属于同一个线程组,拥有同样的优先级和daemon状态。采用自定义的ThreadFactory,可以配置线程的名字、线程组合daemon状态。如果调用ThreadFactory#createThread的时候失败,将返回null,executor将不会执行任何任务。
2.5 RejectedExecutionHandler
当新的任务无法进入等待队列且线程数已达maximumPoolSize上线时,需要定制拒绝策略,拒绝该任务。ThreadPoolExecutor提供了如下可选策略:
- ThreadPoolExecutor#AbortPolicy:这个策略直接抛出RejectedExecutionException异常。
- ThreadPoolExecutor#CallerRunsPolicy:这个策略将会使用Caller线程来执行这个任务,这是一种feedback策略,可以降低任务提交的速度。
- ThreadPoolExecutor#DiscardPolicy:这个策略将会直接丢弃任务。
- ThreadPoolExecutor#DiscardOldestPolicy:这个策略将会把任务队列头部的任务丢弃,然后重新尝试执行,如果还是失败则继续实施策略。
除了上述策略,可以通过实现RejectedExecutionHandler来实现自己的策略。
3. Java线程池缺陷
3.1 非核心线程创建时机
非核心线程在队列满时触发创建,在瞬时冲高情况下,队列被占满,但新创建的线程来不及消费等待队列中的任务,新任务被拒绝。
举个例子,假设有一个线程池corePoolSize=1,maximumPoolSize=2,队列长度为10。在绝大多数情况下,添加任务的速度为每5秒1条,单任务处理时间为1秒,此时线程池中只有一个核心线程。但某个时间段内,任务生产速度增加,每秒钟任务添加速度增长为每500ms处理1条,单任务处理时间不变,持续时间为10分钟。基于当前的策略,行为如下:
- 等待队列中任务开始累积,10秒后任务队列满。
- 第11秒,创建一个新线程处理新任务,第11.5秒新任务到来,此时2个线程均处于busy状态,此时任务丢弃。
- 之后队列始终处于满状态,由于调度时差任务可能进一步丢失。
- 10分钟后开始队列逐步被清空。
这里举了最简单的场景,此场景下始终处于满队列状态,现实中瞬时冲高要比示例中的更复杂,包括任务添加频率不固定,每个任务处理时间的随机性等。
3.2 排队任务调度策略
为阐述方便,本节先定义一个最简配置线程池配置如下:
- 核心线程数:1。
- 队列大小:10。
- 最大线程数:2。
非核心线程在队列满时触发创建,并执行当前的任务,但队列中的任务依旧处于排队状态。举例说明:
- 当前核心线程已满,队列(队列大小为10)中处于排队的任务编号分别为任务20-29。
- 当任务30到来时,队列插入失败,创建新线程,此时新线程处理任务30,而非任务20。
换句话说,任务的执行顺序和任务的添加顺序是不一致的,这可能导致务堵塞。想象队列中依次添加两个任务A、B,并且B执行过程中需要等待任务A的执行结果。此时,如果任务B先于任务A执行,任务B被堵塞,线程池调度效率降低。
4. 策略优化
4.1 非核心线程创建时机
“我们不希望一直等待,等待一切都来不及”。
Java线程池调度策略一直等到任务队列满才开始创建新线程,这个不是我希望看到的。我们希望的是:可以给等待队列设置一个阈值,一旦触及这个阈值马上创建新的线程,防止任务出现过度堆积。
换句话说,未达到阈值时,我们认定核心线程有能力消化全部任务;超过阈值时,我们认定核心线程已经无法满足当前的任务请求现状,必须立即创建新的线程消化当前的任务。另外,这个阈值应当是可配置的。
4.1.1 实现策略
Java线程池创建接口如下:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler)
其中,workQueue就是前面提到的等待队列,类型为BlockingQueue。查看Java源码,非核心线程创建点逻辑如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//未达到核心线程池,创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//等待队列添加成功,double check,一般不创建线程。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//等待队列添加失败,尝试创建线程
else if (!addWorker(command, false))
reject(command);
}
也就是说,当workQueue.offer失败时,触发创建新线程。现在需要改成达到指定阈值时添加失败,同时触发创建线程。但添加失败的任务不是被拒绝,而是添加到附加队列中,等待调度。
直观上,存在两个队列,基本等待队列和附加队列,基本队列大小为queueSize*threshold_value,附加队列大小为queueSize*(1-threshold_value)。但实现上,我们不能采用双队列方式,因为附加队列是基本队列的补充,基本队列中一旦存在空闲,附加队列中的任务需要原子性的移到基本队列中。通过分析BlockingQueue的全部接口,该方案不可行。
最终,我们采用LinkedBlockingQueue做为基本队列类型,定制一个双生产者、单消费者队列。将生产者分为普通生产者、超限生产者两类。
- 普通生产者。使用BlockingQueue原生的offer等接口添加任务,可用队列大小为queueSize*threshold_value。Java原生线程池做为普通生产者调用该组接口。
- 超限生产者。使用自定义的additionOffer接口添加任务,可用队列大小为queueSize*(1-threshold_value)。我们复写的线程池框架负责执行次动作。
线程池定制队列代码如下:
/**
* 定制双生产者、单消费者队列。通过阈值将队列分为普通生产者队列和超限生产者队列两部分。
* 普通生产者队列供Java原生线程池使用
*
* @param <T>
*/
public class DwThreadBlockingQueue<T> extends LinkedBlockingQueue<T> {
private int capacity;
private int additionalCapacity;
//处理原则:capacity,additionalCapacity生产的时候分开生产、消费的时候统一消费
public HwThreadBlockingQueue(int queueSize, float threshold) {
super(queueSize);
this.capacity = queueSize * threshold;
this.additionalCapacity = queueSize - capacity;
}
@Override
public boolean addAll(Collection<? extends T> c) {
throw new UnsupportedOperationException();
}
//普通生产者接口
@Override
public boolean add(T e) {
if (super.size() >= capacity) {
throw new IllegalStateException();
}
//并发下不足够精确
return super.add(e);
}
//普通生产者接口
@Override
public boolean offer(T e) {
//并发下不足够精确
if (super.size() >= capacity) {
return false;
}
return super.offer(e);
}
//普通生产者接口
@Override
public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
//并发下不足够精确
if (super.size() >= capacity) {
return false;
}
return super.offer(e, timeout, unit);
}
//普通生产者接口
@Override
public void put(T e) throws InterruptedException {
throw new UnsupportedOperationException();
}
//符合LVS替换原则,弱化为BlockingQueue时为标准的单生产者、消费者队列
@Override
public int remainingCapacity() {
int remain = super.remainingCapacity() - additionalCapacity;
return remain >= 0 ? remain : 0;
}
//超限生产者接口
public boolean additionalOffer(T e) {
return super.offer(e);
}
}
前面提到,Java原生线程池做为普通生产者添加任务,但如果普通生产者添加任务失败,需要有一个类做为超限生产者添加到超限队列,这个类复写自ThreadPoolExecutor。
public class DwThreadPoolExecutor extends ThreadPoolExecutor {
public HwThreadPoolExecutor(int poolSize, int maxSize, int keepAliveTime, int queueSize, float threshold, RejectedExecutionHandler handler) {
//调用基类,传入定制的双生产者队列
super(poolSize, maxSize, keepAliveTime, TimeUnit.SECONDS,
new HwThreadBlockingQueue<Runnable>(queueSize, threshold));
super.setRejectedExecutionHandler((runnable, executor) -> {
//做为超限生产者,添加超限任务
if (!queue.additionalOffer(runnable)) {
//超限任务添加失败,转由外部处理
handler.rejectedExecution(runnable, executor);
}
});
}
}
4.2 任务调度策略
任务调度策略上,我们希望调度是保序的。
基本想法如下:
- 采用任务代理类,将任务绑定时机延迟到任务执行时,而非任务添加时。
- 增加新的任务队列,按添加顺序保存真正的执行任务
- 运行时,动态从新增任务队列中获取头部任务,做到FIFO。
代理类实现如下:
import java.util.concurrent.BlockingQueue;
public class DwRunnableAgent implements Runnable{
//保序任务队列
private BlockingQueue<Runnable> realTasks = null;
public DwRunnableAgent(BlockingQueue<Runnable> realTasks) {
this.realTasks = realTasks;
}
@Override
public void run() {
//运行时绑定,头部获取任务,保证FIFO
Runnable runnable = realTasks.remove();
runnable.run();
}
}
定制线程池框架增加如下代码:
public class DwThreadPoolExecutor extends ThreadPoolExecutor {
//真正任务保序队列
private LinkedBlockingQueue<Runnable> realRunnables = new LinkedBlockingQueue<>();
public HwThreadPoolExecutor(int poolSize, int maxSize, int keepAliveTime, int baseQueueSize,
int additionalQueueSize, RejectedExecutionHandler handler) {
......
// 内部消化一次
super.setRejectedExecutionHandler((runnable, executor) -> {
//做为超限生产者,添加超限任务
if (!queue.additionalOffer(runnable)) {
//移除任务,任务真正拒绝
realRunnables.remove(runnable);
//超限任务添加失败,转由外部处理
handler.rejectedExecution(runnable, executor);
}
});
}
@Override
public void execute(Runnable command) {
// 任务记录
realRunnables.add(command);
// 封装代理对象
super.execute(new RunnableAgent(realRunnables));
}
}
5. 总结
本人曾尝试了解为何Java原生线程池存在上述缺陷,但大多都在阐述如何基于当前的特性开发,关于机制并没有找到合理的解释。
本文解决的两个缺陷是通用的、机制类的缺陷。
新增抽象层也给工程实践上带来了诸多可能,如任务执行时间统计、死循环检测、架构管控等等。
转载请注明:[随安居士]//www.greatytc.com/p/896b8e18501b