简述
为了彻底了解线程池的时候,我们需要弄清楚线程池创建的几个参数
- corepollsize : 核心池的大小,默认情况下,在创建线程池后,每当有新的任务来的时候,如果此时线程池中的线程数小于核心线程数,就会去创建一个线程执行(就算有空线程也不复用),当创建的线程数达到核心线程数之后,再有任务进来就会放入任务缓存队列中。当任务缓存队列也满了的时候,就会继续创建线程,知道达到最大线程数。如果达到最大线程数之后再有任务过来,那么就会采取拒绝服务策略。
- Maximumpoolsize : 线程池中最多可以创建的线程数
- keeplivetime : 线程空闲状态时,最多保持多久的时间会终止。默认情况下,当线程池中的线程数大于corepollsize 时,才会起作用 ,直到线程数不大于 corepollsize 。
- workQuque: 阻塞队列,用来存放等待的任务
- rejectedExecutionHandler :任务拒绝处理器(这个注意一下),有四种
(1)abortpolicy丢弃任务,抛出异常
(2)discardpolicy拒绝执行,不抛异常
(3)discardoldestpolicy 丢弃任务缓存队列中最老的任务
(4)CallerRunsPolicy 线程池不执行这个任务,主线程自己执行。
1、newFixedThreadPool 定长线程池
一个有指定的线程数的线程池,有核心的线程,里面有固定的线程数量,响应的速度快。正规的并发线程,多用于服务器。固定的线程数由系统资源设置。核心线程是没有超时机制的,队列大小没有限制,除非线程池关闭了核心线程才会被回收。
2、newCachedThreadPool 可缓冲线程池
只有非核心线程,最大线程数很大,每新来一个任务,当没有空余线程的时候就会重新创建一个线程,这边有一个超时机制,当空闲的线程超过60s内没有用到的话,就会被回收,它可以一定程序减少频繁创建/销毁线程,减少系统开销,适用于执行时间短并且数量多的任务场景。
3、ScheduledThreadPool 周期线程池
创建一个定长线程池,支持定时及周期性任务执行,通过过schedule方法可以设置任务的周期执行
4、newSingleThreadExecutor 单任务线程池
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,每次任务到来后都会进入阻塞队列,然后按指定顺序执行。
关键源码解读
1、execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断当前线程个数是否小于corePoolSize(核心线程数),如果小于的话,就再创建一个线程,每个线程都被封装成一个Worker,
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果大于核心线程的话就尝试把任务加入缓存队列,这里增加了状态出现异常的确认判断,
//如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列放不了,只能采用默认的拒绝服务策略了,
else if (!addWorker(command, false))
reject(command);
}
源码中出现ctl的次数比较多,那么这是个什么呢?
我们可以看看它的定义
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
方法
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
它实质上就是一个线程安全的32位的Integer,用前三位表示线程池的状态,后29位来表示线程的个数,所以在计算个数的时候用到了workerCountOf,忽略前三位带来的影响。
2、Worker中的run方法
Worker就是对线程的封装,线程池中维护了一个HashSet<Worker>的一个集合来存储工作线程,每次addWork的时候就往这个里面加,因为HashSet是不安全的,所以加了ReentrantLock来做同步
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
其实这个操作很简单,就是一个while不断的去getTask,获得任务之后,就依次执行
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
(发现每次执行任务的时候都加了锁,有点奇怪,这里我还要看一下)
那么假设任务队列中没有了呢?那这里就用到了我们定义的keeplivetime ,在getTask中有这样一段代码,
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
也就是当超过keeplivetime 没有拿到就会返回null,这个时候循环就会截止,这个线程Wo也就会结束,所以说keepAliveTime指的是最长的poll时间