ThreadPoolExecutor线程池参数
1. 背景
公司一个业务需要循环多次(超过2000次),后将拿到的数据写入数据库中。
2. 解决方法
基于业务的特点,想到了两个方法
1. 对数据库大批量的写进行分批次提交,基于JdbcTemplates,或者Batch提交
2. 创建多个线程池,使用CountDownLatch计数。
3. 应用场景
应用场景:线程的创建和销毁是一个耗时的操作。如果在程序中频繁的创建和销毁线程,会对程序的反应速度造成严重的影响。有时候可能导致crash掉。如果在频繁的使用线程就可以使用线程池代替。
5.参数说明
-
corePoolSize:线程池基本大小 核心线程数
1. 核心线程 数一定会存活,即使没有任务需要执行 2. tasks(任务数) <coorePooSize 当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,tasks(任务数) > coorePooSize,会放入之后的队列 3. 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭 4. 如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
-
queueCapacity(任务队列容量)
1. ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序 2. LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列 3. SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。 4. PriorityBlockingQueue:一个具有优先级得无限阻塞队列
-
maxPoolSize 最大线程数
1. 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务 2. 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常 3. 如果使用了无界的任务队列这个参数就没什么效果
-
keepAliveTime:线程空闲时间
1. 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize 2. 如果allowCoreThreadTimeout=true,则会直到线程数量=0
-
RejectedExecutionHandler: 拒绝策略
当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
1. ThreadPoolExecutor.AbortPolicy: 当线程池中的数量等于最大线程数时抛 java.util.concurrent.RejectedExecutionException 异 常,涉及到该异常的任务也不会被执行,线程池默认的拒绝策略就是该策略。 2. ThreadPoolExecutor.DiscardPolicy(): 当线程池中的数量等于最大线程数时,默默丢弃不能执行的新加任务,不报任何异常。 3. ThreadPoolExecutor.CallerRunsPolicy(): 当线程池中的数量等于最大线程数时,重试添加当前的任务;它会自动重复调用execute()方法。 4. ThreadPoolExecutor.DiscardOldestPolicy(): 当线程池中的数量等于最大线程数时,抛弃线程池中工作队列头部的任务(即等待时间最久的任务),并执行当前任务。
5. 线程池工作顺序
1.当线程池中线程数量小于 corePoolSize 则创建线程,并处理请求。
2. 当线程池中线程数量大于等于 corePoolSize 时,则把请求放入 workQueue 中,随着线程池中的核心线程们不断执行任务,只要线程池中有空闲的核心线程,线程池就从 workQueue 中取任务并处理。
3. 当 taskQueue 已存满,放不下新任务时则新建非核心线程入池,并处理请求直到线程数目达到 maximumPoolSize(最大线程数量设置值)。
4. 如果线程池中线程数大于 maximumPoolSize 则使用 RejectedExecutionHandler 来进行任务拒绝处理。
6. 如何设置参数
参考了这篇:ThreadPoolExecutor线程池参数设置技巧
- 默认值
- corePoolSize=1
- queueCapacity=Integer.MAX_VALUE
- maxPoolSize=Integer.MAX_VALUE
- keepAliveTime=60s
- allowCoreThreadTimeout=false
- rejectedExecutionHandler=AbortPolicy()
- 如何来设置
- 需要根据几个值来决定
- tasks :每秒的任务数,假设为500~1000
- taskcost:每个任务花费时间,假设为0.1s
- responsetime:系统允许容忍的最大响应时间,假设为1s
- 做几个计算
- corePoolSize = 每秒需要多少个线程处理?
- threadcount = tasks/(1/taskcost) =taskstaskcout = (500~1000)0.1 = 50~100 个线程。corePoolSize设置应该大于50
- 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
- queueCapacity = (coreSizePool/taskcost)*responsetime
- 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
- 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
- maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
- 计算可得 maxPoolSize = (1000-80)/10 = 92
- (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
- rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
- keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
7. 代码
//参数初始化
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//核心线程数量大小
private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//线程池最大容纳线程数
private static final int maximumPoolSize = CPU_COUNT * 2 + 1;
//线程空闲后的存活时长
private static final int keepAliveTime = 30;
//任务过多后,存储任务的一个阻塞队列
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
//线程的创建工厂
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AdvacnedAsyncTask #" + mCount.getAndIncrement());
}
};
//线程池任务满载后采取的任务拒绝策略
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
//线程池对象,创建线程
ThreadPoolExecutor mExecute = new ThreadPoolExecutor(
corePoolSize, //线程池的核心线程数
maximumPoolSize,//线程池所能容纳的最大线程数
keepAliveTime,//线程的空闲时间
TimeUnit.SECONDS,//keepAliveTime对应的单位
workQueue,//线程池中的任务队列
threadFactory, //线程工厂
rejectHandler//当任务无法被执行时的拒绝策略
);