Java并发学习笔记——第九章 Java中的线程池
Java中的线程池运用是并发开发中的总重之中。
Java中线程池时运用场景最多的并发框架,几乎所有异步或并发执行任务的程序都可以使用线程池。合理使用线程池可以带来三点好处:
- 降低资源消耗:重复利用线程可降低因创建、销毁线程带来的损耗。
- 提高响应速度:任务到达时不需要等待线程创建就能执行。
- 提高线程的可管理性:线程是稀缺资源,若无限制地创建,会消耗系统资源、降低系统稳定性。使用线程池可以进行统一分配、调优和监控。
线程池实现原理
当一个新任务提交到线程池时,线程池处理流程如下:
- 判断核心线程池的线程是否都在执行任务。若不是,则创建一个新的工作线程来执行任务;否则进入下个流程。
- 判断工作队列是否已满。若没满,则将新提交的任务存储在这个工作队列中;否则进入下个流程。
- 判断线程池的线程是否都处于工作状态。若不是,则创建一个新的工作线程来执行任务;否则交给饱和策略处理这个任务。
如图,执行ThreadPoolExecutor.execute()
会产生四种情况:
- 若当前运行的线程少于
corePoolSize
,则创建新线程来执行任务(执行创建新线程需要获取全局锁)。即使此时有线程空闲。 - 若运行的线程等于或多于
corePoolSize
,则将任务加入BlockingQueue
(工作队列)。 - 若无法将任务加入
BlockingQueue
(队列已满),则创建新的线程来处理任务(执行创建新线程需要获取全局锁)。 - 若创建新线程将使当前运行的线程超出
maximumPoolSize
,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()
方法。
ThreadPoolExecutor
设计成上面的思路,主要是为了避免获取全局锁。在ThreadPoolExecutor
完成预热后(当前运行的线程数大于等于corePoolSize
),几乎所有的execute()
方法调用都是执行步骤2 。
工作线程:线程池创建线程时,将线程封装为工作线程Worker
,Worker
在执行完任务后不会销毁,会循环从BlockingQueue
获取任务执行。
饱和策略:默认使用AbortPolicy
。
-
AbortPolicy
:中止策略。抛出RejectedExecutionException
,调用者可以捕获该异常。 -
DiscardPolicy
:抛弃策略。当新提交的任务无法保存到队列中等待执行时,该策略会悄悄抛弃该任务。 -
DiscardOldestPolicy
:抛弃旧的策略。将抛弃第一个在队列中等待的任务。 -
CallerRunsPolicy
:调用者运行策略。该策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者(调用线程池执行任务的主线程),从而降低新任务的流程。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。当线程池的所有线程都被占用,并且工作队列被填满后,下一个任务会在调用execute时在主线程中执行(调用线程池执行任务的主线程)。由于执行任务需要一定时间,因此主线程至少在一段时间内不能提交任务,从而使得工作者线程有时间来处理完正在执行的任务。在这期间,主线程不会调用accept,因此到达的请求将被保存在TCP层的队列中。如果持续过载,那么TCP层将最终发现它的请求队列被填满,因此同样会开始抛弃请求。当服务器过载后,这种过载情况会逐渐向外蔓延开来——从线程池到工作队列到应用程序再到TCP层,最终达到客户端,导致服务器在高负载下实现一种平缓的性能降低。
线程池使用
创建线程池
new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
创建线程池需要输入几个参数:
-
corePoolSize
(线程池基本大小):当一个任务提交到线程池,若线程池的线程数量没有达到corePoolSize
,即使有空闲的基本线程,也会创建一个新线程来执行任务(为了尽快预热线程池)。如果调用prestartAllCoreThreads()
方法则线程池会提前创建并启动所有线程。 -
maximumPoolSize
(线程池最大数量):线程池允许创建的最大线程数。若工作队列满了,且线程数没有达到maximumPoolSize
,则线程池会创建新的线程执行任务。 -
keepAliveTime
(线程活动保持时间):工作线程空闲后保持存活的时间。若任务很多且每个任务执行时间短,可以调大该值提高线程利用率。若设置了ThreadPoolExecutor.allowCoreThreadTimeOut(false)
,则会维持corePoolSize
个线程不回收。 -
unit
(线程活动保持时间的单位):可选单位有天、小时、分钟、毫秒、微妙、纳秒。 -
workQueue
(工作队列):在线程池预热后,新任务提交进线程池会被放进工作队列,任务调度时再从中取出来放入工作线程,提供了四种工作队列:-
ArrayBlockingQueue
:基于数组的有界阻塞队列,按FIFO排序。 -
LinkedBlockingQueue
:基于链表的无界阻塞队列(最大容量为Integer.MAX
),按照FIFO排序。因为是无界阻塞队列,所以maximumPoolSize
不起作用。 -
SynchronousQueue
:不缓存任务的阻塞队列,生产者放入一个任务必须等消费者取出这个任务。因此任务进入该队列后会直接进行调度处理,若没有可用工作线程,则直接创建新线程。 -
PriorityBlockingQueue
:具有优先级的无界阻塞队列,优先级由参数Comparator
实现。
-
-
threadFactory
:用于设置创建线程的工厂,可以通过线程工厂为每个创建出来的线程设置更有意义的名字。开源框架的guava
提供的ThreadFactoryBuilder
可以快速给线程池里的线程设置有意义的名字。 -
handler
(饱和策略):上文已概述过。
向线程池提交任务
有两种向线程池提交任务的方法,execute()
和submit()
:
-
execute()
方法用于提交不需要返回值的任务,因此无法判断任务是否被执行成功。threadPool.execute(new Runnable() { @Override public void run() { //TODO Auto-generated method stub } })
submit()
方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,可以通过该对象判断任务是否执行成功,且调用future.get()
获取返回值。get()
会阻塞当前线程直到任务完成,使用get(long timeout, TimeUnit unit)
可以设定超时,若任务没执行完也依然返回(但不中断任务)。
关闭线程池
有两种关闭线程池的方法,他们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt()
中断线程,所以无法响应中断的任务可能永远无法中止。
-
shutdown()
:将线程池状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程。 -
shutdownNow()
:先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程。
因此根据线程池特性,一般使用shutdown()
,若任务不一定要执行完,可以用shutdownNow()
。
合理配置线程池
配置线程池需要分析的角度:
- 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
- 任务的优先级:高、中、低。
- 任务的执行时间:长、中、短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
- 性质不同的任务可以用不同规模的线程池分开处理。
- CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。
- IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu 。
- 混合型任务可以拆分为CPU密集型任务和IO密集型任务,分解后吞吐量将高于串行。若两个任务执行时间相差太大,则没有必要。
- 优先级不同的任务可以使用优先级队列
PriorityBlockingQueue
处理。 - 执行时间不同的任务可以交给不同规模的线程池处理,也可以使用优先级队列。
- 依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待时间越长,则CPU空闲时间越长,因此线程数应该设置得越大,才能更好利用cpu。
- 建议使用有界队列。有界队列能增加系统得稳定性和预警能力。如sql任务堆积的越多,队列会越长,会导致sql非常缓慢。
监控线程池
可以使用一些属性监控线程池:
-
taskCount
:线程池需要执行的任务数量。 -
completedTaskCount
:线程池在运行过程中已完成的任务数量,小于或等于taskCount
。 -
largestPoolSize
:线程池里曾经创建过的最大线程数量。该数据可以判断线程池是否曾经满过。 -
getPoolSize
:线程池线程数量。 -
getActiveCount
:活动的线程数。
可以重写线程池的beforeExecute
、afterExecute
、terminated
方法,在任务执行前、执行后和线程池关闭前执行一些代码进行监控。这些方法在线程池里是空方法。