总结1:
运行过程:
线程池内有固定核心线程数量
corePoolSize
,当线程池接收到任务后,如果发现核心线程数量还没满,会创建一个核心线程来执行这一任务(不论此时是否有空闲线程,都会新创建
),如果核心线程数量满了,任务还是没有全部执行,那么线程池会将多出的任务放入阻塞队列workQueue
内,如果阻塞队列都满了,就会使用非核心线程处理这些问题,非核心线程的数量是通过ThreadPoolExecutor
构造函数配置的maximumPoolSize
属性确定的,非核心线程数量=maximumPoolSize-核心线程数量(corePoolSize)
。
补充:核心线程概念:
核心线程在空闲时是不会被回收的,且线程池只关心核心线程数量
,它不标记线程是否为核心线程,也就是说线程池只是指明有多少个核心线程,而这里面的线程假设在后续的交互中里面有个从线程1变成线程10他都不会在意核心线程
一般不会被销毁,即使是空闲的状态,但是如果通过方法allowCoreThreadTimeOut(boolean value)
设置为 true 时,超时也同样会被销毁;
总结2:
各个创建方法对比:
- 若自身对性能有很大需求,且对于机器性能、代码能力等有足够自信,使用
ThreadPoolExecutor
的构造方法是最合适的。newSingleThreadExecutor()
是构造只有一个线程的线程池,保存任务的队列是无界的,可接收所有任务,但是同时只有一个线程执行任务newFixedThreadPool()
是构造可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程。newScheduledThreadPool()
创建一个可重用线程池(最大线程数为int最大值),它可安排在给定延迟后运行命令或者定期地执行(因为使用DelayedWorkQueue()队列)。newCachedThreadPool()
是构造一个可根据需要创建新线程的线程池(最大线程数为int最大值),但是在以前构造的线程可用时将重用它们。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。
总结3
关于队列
workQueue
:
- 直接提交:直接提交队列(如
SynchronousQueue
),此种队列将任务直接提交给线程而不保存他们,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。- 无界队列。使用无界队列(如
LinkedBlockingQueue
)将导致在所有corePoolSize
线程都忙时新任务在队列中等待。这样,创建的线程就不会超过corePoolSize
(因此,maximumPoolSize
的值也就无效了)。- 有界队列。当使用有限的
maximumPoolSizes
时,有界队列(如ArrayBlockingQueue
)有助于防止资源耗尽,但是可能较难调整和控制。
补充:
ArrayBlockingQueue
:是一个用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue
:是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE
。
- 线程池好处:
- 降低资源消耗
- 提高响应速度
- 方便线程管理
Executors 工具类的四种创建线程池的方式:
-
(特点:单一核心线程,无存放非核心线程的位置,
LinkedBlockingQueue
队列结构)newSingleThreadExecutor(翻译:Single/单一的,Executor/执行器)
只有一个线程来执行任务,适用于有顺序的任务的应用场景。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,它可以保证任务按照指定顺序(FIFO,LIFO)执行,它还有可以指定线程工厂(ThreadFactory
)的重载方法,可以自定义线程的创建行为
说明:创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,也就相当于单线程串行执行任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。使用
LinkedBlockingQueue
作为等待队列。
2.(特点:固定核心线程数量,无存放非核心线程的位置,LinkedBlockingQueue
队列结构)newFixedThreadPool(翻译:Fixed/固定的,Pool/池)
固定线程数的线程池,只有核心线程,核心线程的即为最大的线程数量,没有非核心线程。每次提交一个任务就创建一个线程,直到达到线程池的最大大小。线程池一旦达到最大值就会保持不变,如果当中的某个线程因为异常而结束,那么线程池会新建一个线程加入到线程池中。它还可以控制线程的最大并发数,超出的线程会在阻塞队列(LinkedBlockingQueue
)中等待,同样它也有可以指定线程工厂(ThreadFactory
)的重载方法,可以自定义线程的创建行为。
3.(特点:固定核心线程数量,有2^31 - 1存放非核心线程位置,有60s空闲,SynchronousQueue
队列结构)newCachedThreadPool(翻译:Cached/高速缓存)
创建一个可缓存线程池,最大的线程个数为 2^31 - 1(Integer.MAX_VALUE
),可以认为是无限大,若无可回收,则新建线程,如果线程池的大小超出了处理任务所需要的线程,那么就会回收部分空闲(60s 不执行任务)的线程。
4.(特点:固定核心线程数量,有2^31 - 1存放非核心线程位置,空闲时间为0,DelayedWorkQueue
队列结构)newScheduledThreadPool (翻译:Scheduled/预定的)
周期性执行任务的线程池,按照某种特定的计划执行线程中的任务,有核心线程,但也有非核心线程,非核心线程的大小也为无限大(Integer.MAX_VALUE
:2^31 - 1),适用于执行周期性的任务。
补充:DelayedWorkQueue
作为等待队列,这中类型的队列会保证只有到了指定的延时时间,才会执行任务。
Executors 工具类的四种创建线程池方式的具体实现
但是阿里开发手册上不建议使用这种方式,因为有太多不可控因素会导致oom异常,切无法任务进行干预。
Executors.newCachedThreadPool()
创建方式
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,//线程池中核心线程数的最大值
int maximumPoolSize,//线程池中最多能拥有的线程数
long keepAliveTime,//空闲线程存活时间
TimeUnit unit,//空闲线程存活时间的单位
BlockingQueue<Runnable> workQueue,//用于存放任务的阻塞队列
ThreadFactory threadFactory,//创建线程工厂
RejectedExecutionHandler handler//当 workQueue 已满,并且池中的线程数达到 maximumPoolSize 时,线程池继续添加新任务时采取的策略)
注:由此可看出,这种线程池,无核心线程,全部都是非核心线程
SynchronousQueue
:一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
,静态工厂方法Executors.newCachedThreadPool(上面列出的第三个)
使用了这个队列。
public class ThreadPool1 {
public static void main(String[] args) {
ExecutorService e = Executors.newCachedThreadPool();
Runnable r = new MyRunnable();
e.submit(r);//获取线程池中的某一个线程对象,然后调用runnable接口中的run方法
e.submit(r);
e.submit(r);
e.submit(r);//注意run方法运行完,线程池中的线程并不消耗,而是归还到池中
e.shutdown();
}
}
class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println("给我一个线程:"+Thread.currentThread().getName());
try {
System.out.println("线程开始消耗资源"+Thread.currentThread().getName());
Thread.sleep(2000);
System.out.println("线程使用完毕"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("归还到线程池中"+Thread.currentThread().getName());
}
}
改进后创建newCachedThreadPool方式(引入guava依赖):
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();
ExecutorService createCacheThreadPool(){
int coreSize = 10;
int maxSize = 20;
return new ThreadPoolExecutor(coreSize, maxSize, 10L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.AbortPolicy());
}
说明:创建一个可缓存线程池,应用中存在的线程数可以无限大(最大线程数为int最大值)。当提交任务速度高于线程池中任务处理速度时,缓存线程池会不断的创建线程,线程数完全取决于jvm可以创建的线程数,直到资源耗尽会抛出oom异常;如果在执行第二个任务时,第一个任务已经完成,那么会复用第一个任务的线程执行第二个任务,如果一个线程超过60秒没有被使用,就会被线程池回收。所以大家要慎用此方法,适用于提交短期的异步小程序,以及负载较轻的服务器。
Executors.newFixedThreadPool(2)
创建方式
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
注:由此可看出,本方式只创建核心线程
LinkedBlockingQueue
:链表结构的阻塞队列,尾部插入元素,头部取出元素。LinkedBlockingQueue
是我们在ThreadPoolExecutor
线程池中常用的等待队列。它可以指定容量也可以不指定容量。由于它具有“无限容量”的特性,实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE
。LinkedBlockingQueue
的实现是基于链表结构,而不是类似ArrayBlockingQueue
那样的数组。但实际使用过程中,不需要关心它的内部实现,如果指定了LinkedBlockingQueue
的容量大小,那么它反映出来的使用特性就和ArrayBlockingQueue
类似了。
ExecutorService e =Executors.newFixedThreadPool(2);//创建一个包含两个线程的线程池
Runnable r = new MyRunnable();
e.submit(r);//获取线程池中的某一个线程对象,然后调用runnable接口中的run方法
e.submit(r);
e.submit(r);
e.submit(r);//注意run方法运行完,线程池中的线程并不消耗,而是归还到池中
e.shutdown();
改进后创建newFixedThreadPool方式(引入gauva依赖):
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();
public static ExecutorService createFixedThreadPool() {
int poolSize = 5;
int queueSize = 10;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize), threadFactory, new ThreadPoolExecutor.AbortPolicy());
return executorService;
}
说明:创建一个线程数固定的线程池,可以根据系统的资源进行设置,如:Runtime.getRuntime().availableProcessors()。线程不会被回收,除非调用shutdown()方法才会回收,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。此方法可控制线程最大并发数,超出线程数据的任务会放到队列中,但是从源码中可以看到此队列为无界队列,所以如果我们一次查出的数据过多很有可能会导致oom异常,因为队列会无限扩充,真正的导致OOM的其实是LinkedBlockingQueue.offer方法。。使用于为了满足资源管理需求而需要限制当前线程数量的场合,使用于负载比较重的服务器但是要注意控制任务数量。
Executors.newScheduledThreadPool(2)
创建方式
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
说明:计划型线程池,可以设置固定时间的延时或者定期执行任务,多数情况下可用来替代Timer类,同样是看线程池中有没有空闲线程,如果有,直接拿来使用,如果没有,则新建线程加入池。使用的是 DelayedWorkQueue 作为等待队列,这中类型的队列会保证只有到了指定的延时时间,才会执行任务。
下方为执行定时线程池示例
public class ThreadPool2 {
public static void main(String[] args) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(3);//参数表示线程容量
System.out.println(simpleDateFormat.format(new Date()));
// 但是如果执行任务时间大约2s则不会并发执行后续任务将会延迟。
e.scheduleAtFixedRate(new MyRunnable2(), 0, 2000, TimeUnit.MILLISECONDS);//第一个参数任务,第二个参数表示执行任务前等待的时间,第三个参数表示任务启动间隔时间,第四参数表示时间单位
e.scheduleAtFixedRate(new MyRunnable1(), 0, 2000, TimeUnit.MILLISECONDS);//第一个参数任务,第二个参数表示执行任务前等待的时间,第三个参数表示任务启动间隔时间,第四参数表示时间单位
}
}
class MyRunnable2 implements Runnable{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"给我一个线程:"+simpleDateFormat.format(new Date()));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MyRunnable1 implements Runnable{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"给我一个线程1:"+simpleDateFormat.format(new Date()));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Executors.newSingleThreadExecutor()
创建方式
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
说明:创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,也就相当于单线程串行执行任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。使用
LinkedBlockingQueue
作为等待队列。
ExecutorService e =Executors.newSingleThreadExecutor();//创建一个单线程的线程池
Runnable r = new MyRunnable();
e.submit(r);//获取线程池中的某一个线程对象,然后调用runnable接口中的run方法
e.submit(r);
e.submit(r);
e.submit(r);//注意run方法运行完,线程池中的线程并不消耗,而是归还到池中
e.shutdown();
jdk1.8新增创建模式:Executors.newWorkStealingPool(3)
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
说明:创建一个带并行级别的线程池,并行级别决定了同一时刻最多有多少个线程在执行,如不穿如并行级别参数,将默认为当前系统的CPU个数。
ExecutorService e = Executors.newWorkStealingPool(3);
Runnable r = new MyRunnable();
for (int i = 1; i <= 50; i++) {
final int count=i;
e.submit(r);
}
while (true){
// 必须有无限循环才能看见效果
}
关于ThreadPoolExecutor类创建方式
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
成员变量 ctl
主要用于存储线程池的工作状态以及线程池正在运行的线程数。
关于线程池状态
在 ThreadPoolExecutor
定义了线程池的五种状态(注意,这里说的是线程池状态,不是池中的线程的状态)。
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING
状态
允许提交并处理任务
SHUTDOWN
状态
不会处理新提交的任务,但会处理完已处理的任务
STOP
状态
不会处理新提交的任务,也不会处理阻塞队列中未执行的任务,并设置正在执行任务的中断标志位
TIDYING
状态
所有任务执行完毕,线程池中工作的线程数为 0,等待执行 terminated() 钩子方法
创建一个线程池时的状态为 RUNNING
。
调用线程池的 shutdown
方法,将线程池由RUNNING
状态转为 SHUTDOWN
状态。调用 shutdownNow
方法,将线程池由 RUNNING
状态转为 STOP
状态。SHUTDOWN
状态和 STOP
状态都会先变为 TIDYING
状态,最终都会变为 TERMINATED
状态。
ThreadPoolExecutor 的构造函数
public ThreadPoolExecutor(int corePoolSize,//线程池中核心线程数的最大值
int maximumPoolSize,//线程池中最多能拥有的线程数
long keepAliveTime,//空闲线程存活时间
TimeUnit unit,//空闲线程存活时间的单位
BlockingQueue<Runnable> workQueue,//用于存放任务的阻塞队列
ThreadFactory threadFactory,//创建线程工厂
RejectedExecutionHandler handler//当 workQueue 已满,并且池中的线程数达到 maximumPoolSize 时,线程池继续添加新任务时采取的策略)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
//这种情况下,一旦提交的线程数超过当前可用线程数时,就会抛出java.util.concurrent.RejectedExecutionException,这是因为当前线程池使用的队列是有边界队列,队列已经满了便无法继续处理新的请求。但是异常(Exception)总比发生错误(Error)要好
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
描述:
如果没有空闲的线程执行该任务,并且池中运行的线程数小于corePoolSize
时,则创建新的线程执行该任务
如果没有空闲的线程执行该任务,并且当池中正在执行的线程数大于corePoolSize
时,新添加的任务进入workQueue
排队(如果workQueue
长度允许),等待空闲线程来执行
如果没有空闲的线程执行该任务,并且阻塞队列已满同时池中的线程数小于maximumPoolSize
,则创建新的线程执行该任务
如果没有空闲的线程执行该任务,并且阻塞队列已满同时池中的线程数等于maximumPoolSize
,则根据构造函数中的handler
指定的策略来拒绝新添加的任务
keepAliveTime 和 unit 单位
keepAliveTime 表示那些超出corePoolSize数量之外的线程的空闲时间大于keepAliveTime后就被清除了。
workQueue 任务队列
workQueue
决定了缓存任务的排队策略,对于不同的任务场景我们可以采取不同的策略,这个队列需要一个实现了BlockingQueue
接口的任务等待队列。从ThreadPoolExecutor
的文档中得知,官方一共给我们推荐了三种队列,分别是:SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue
。其中SynchronousQueue
和ArrayBlockingQueue
属于有限队列,LinkedBlockingQueue
属于无限队列,具体作用如下:
-
SynchronousQueue
:一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
,静态工厂方法Executors.newCachedThreadPool(上面列出的第三个)
使用了这个队列。
2.ArrayBlockingQueue
:有界阻塞队列。一个由数组支持的有界阻塞队列。此队列按FIFO(先进先出)原则对元素进行排序。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞,试图从空队列中提取元素将导致类似阻塞。
-
LinkedBlockingQueue
:链表结构的阻塞队列,尾部插入元素,头部取出元素。LinkedBlockingQueue
是我们在ThreadPoolExecutor
线程池中常用的等待队列。它可以指定容量也可以不指定容量。由于它具有“无限容量”的特性,实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE
。LinkedBlockingQueue
的实现是基于链表结构,而不是类似ArrayBlockingQueue
那样的数组。但实际使用过程中,不需要关心它的内部实现,如果指定了LinkedBlockingQueue
的容量大小,那么它反映出来的使用特性就和ArrayBlockingQueue
类似了。
threadFactory 创建线程的工厂
ThreadPoolExecutor
有的没有threadFactory
参数的构造方法中使用的创建线程的工厂就是默认的工厂
handler 拒绝策略
表示当workQueue
已满,池中的线程数达到maximumPoolSize
时,线程池拒绝添加新任务时采取的策略。
有如下4种:
ThreadPoolExecutor.AbortPolicy
:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy
:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy
:由调用线程处理该任务
线程提交给线程池的方法对比:
void execute(Runnable command);//不关心返回结果
Future<T> submit(Callable<T> task);//关心返回结果, 源码函数
Future<T> submit(Runnable task, T result);//这种调用和第二种类似,但是它的返回值是传进来的,而第二种方式是call返回的
Future<?> submit(Runnable task);//不关心返回结果,虽然返回Future,但是其get()方法总是返回null
ScheduledFuture scheduledFuture = executor3.schedule(myThreadImplements, 100L, TimeUnit.SECONDS);//具有返回值Future ,当内部线程使用实现Callable方式是现实时(具有返回值),可以接收返回值。能实现定时任务和延迟任务。上述线程池中只有newScheduledThreadPool创建的线程池具有此方法。
获取多个线程返回结果
一般使用ExecutorCompletionService
如下:
void solve(Executor executor, Collection<Callable<Result>> solvers)
throws InterruptedException, ExecutionException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 构造器
for (Callable<Result> s : solvers){// 提交所有任务
ecs.submit(s);
}
int n = solvers.size();
for (int i = 0; i < n; ++i) {// 获取每一个完成的任务
Result r = ecs.take().get();
if (r != null)
use(r);
}
}
参考文章:https://www.cnblogs.com/htyj/p/10849020.html
参考文章:https://juejin.im/post/6844904003491332109#heading-7