ThreadPoolExecutor怎么工作

我比较常用的是定时任务线程池:
<pre>
ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
ses.scheduleAtFixedRate(XXX, XXX, XXX, XXX)
</pre>
ScheduledThreadPoolExecutor类继承自ThreadPoolExecutor以及实现了ScheduledExecutorService接口的schedule类接口,由此我觉得我有必要深究ThreadPoolExecutor。
<h4>1、ThreadPoolExecutor的构造参数:</h4>
<pre>
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
</pre>
这7个参数如下:
1、corePoolSize核心线程数大小,当线程数<corePoolSize ,会创建线程执行runnable </br>
2、workQueue 保存任务的阻塞队列,有Array类型、Linked类型的队列 FIFO,如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
3、maximumPoolSize 最大线程数, 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新线程来处理被添加的任务。
4、keepAliveTime 保持存活时间,当线程数大于corePoolSize,空闲线程能保持的最大时间。
如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
5、unit 时间单位,枚举类型有天、时分秒等
6、threadFactory 创建线程的工厂,默认是DefaultThreadFactory其创建的线程非守护、无优先级的,当然自己也可以实现ThreadFactory,如:
<pre>
private static class TestThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, THREAD_NAME); //自定义线程名称
t.setDaemon(true); //守护线程
return t;
}
}
</pre>
7、handler 拒绝策略:如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的遗弃策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
<pre>


RejectExceptionHandler.png

</pre>
AbortPolicy:抛RejectedExecutionException异常
DiscardPolicy:抛弃不做任何处理,其实就是rejectedExecution方法里面空实现
DiscardOldestPolicy:将位于workQueue 工作队列头部的任务将被删除。但这样有个不好的地方,当某个任务还没执行完成时有可能就被取消掉了,这样完全无法预期线程的执行到了何种地步因为随时会被结束
CallerRunsPolicy:翻看其rejectedExecution接口实现 if (!e.isShutdown()) { r.run(); } 其实就是类似于在<b>client的主线程中执行了一下线程对象的run方法</b>,注意不是执行start方法来调度新的线程,举个例子
<pre>
public class Test {
public static class TestHH implements Runnable {
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.CallerRunsPolicy());
for(int i = 0;i < 10;i++){
Runnable testRun = new TestHH();
tpe.execute(testRun);
}
}
}
输出:
main
main
main
main
main
main
main
main
pool-1-thread-1
pool-1-thread-1
</pre>
这边会发现是main这个client线程去执行run方法,这样其实也有个不好的地方,client线程会阻塞起来去执行被抛弃的线程的run方法。当然自己也可以自定义实现拒绝接口,如打印一些日志信息等
<b>最后还是上个图描述ThreadPoolExecutor构造流程更清晰一些:</b>
<pre>


ThreadPoolExecutor构造流程图.png

</pre>
<h4>2、源代码分析:</h4>
<b>线程池状态分析</b>
RUNNING:接受新的任务和队列任务
SHUTDOWN:不接受新的任务但是正常处理队列任务
STOP:不接受新的任务不处理队列任务且中断正在处理的任务
TIDYING:所有任务被中止,正在工作的线程数为0
TERMINATED:线程池被彻底终止
线程池状态变化过程:
线程池状态.png

<pre>
//ctl一个变量同时存储runState和workerCount,其中runState占用高3位,workCount占用低29位,其实ReentrantLock的读写锁也是用16位的高位和16位的低位来表示
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//workerCount使用的位数:32-3=29位
private static final int COUNT_BITS = Integer.SIZE - 3;

//workerCount最大值:536870911,即0b00011111_11111111_11111111_11111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState存储在高位,占用3位
//0b11100000_00000000_00000000_00000000
private static final int RUNNING    = -1 << COUNT_BITS;

//0b00000000_00000000_00000000_00000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//0b00100000_00000000_00000000_00000000
private static final int STOP       =  1 << COUNT_BITS;

//0b01000000_00000000_00000000_00000000
private static final int TIDYING    =  2 << COUNT_BITS;

//0b01100000_00000000_00000000_00000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取runState,即保�留ctl的高3位,后29位置0
private static int runStateOf(int c)     {
  return c & ~CAPACITY;
}

//获取workerCount,即保留ctl的低29位,高3位置0
private static int workerCountOf(int c)  { 
  return c & CAPACITY;
}

//设置ctl,或操作
private static int ctlOf(int rs, int wc) {
  return rs | wc;
}

</pre>
<b>execute的实现:</b>
<pre>
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //工作线程数小于核心线程数时
if (addWorker(command, true)) //将线程放进线程池中,如果线程池是SHUTDOWN或者STOP或者异常则返回false
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//double check,如果队列中已满则删除该线程任务且按reject策略处理
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //如果队列已满直接尝试创建线程处理,如果工作线程数未超过最大线程数则成功,否则按reject策略处理
reject(command);
}
</pre>
<b>addWorker实现:</b>
<pre>
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //java中标记的跳转来实现goto功能
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&   //线程状态中只有running的状态值小于SHUTDOWN
            ! (rs == SHUTDOWN &&  
               firstTask == null && //线程为空值
               ! workQueue.isEmpty())) //队列是空的
            return false; //直接返回false

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;  // 工作线程数大于上限则失败
            if (compareAndIncrementWorkerCount(c)) //原子更新ctl值其实状态没变,工作线程数加1
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock(); //通过ReentrantLock来实现同步
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();  //recheck
                int rs = runStateOf(c); //recheck

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) { // running状态或者(SHUTDOWN且firstTask为空)
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w); //添加保存工作线程
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s; //更新largestPoolSize值,getLargestPoolSize会引用
                    workerAdded = true; 
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) { 
                t.start();
                workerStarted = true;  //正常启动
            }
        }
    } finally {
        if (! workerStarted) //没有正常启动的话
            addWorkerFailed(w); //里面workers会remove(w)且原子性修改ctl - 1的工作线程数量
    }
    return workerStarted;
}

</pre>
<b>shutdown and shutdownNow:</b>
shutDown()
当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时则不能再往线程池中添加任何任务,线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
shutdownNow()
根据JDK文档描述,执行该方法,线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,同时它会返回那些未执行(还在队列中等待执行)的任务。 试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有使用sleep 、wait、Condition、定时锁等, interrupt()方法是无法中断当前的线程的。
所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
两者区别:
a、是一个要将线程池推到SHUTDOWN状态,一个将推到STOP状态
b、并且对运行的线程处理方式不同,shutdown()只中断空闲线程,而shutdownNow()会尝试中断所有活动线程
c、还有就是对队列中的任务处理,shutdown()队列中已有任务会继续执行,而shutdownNow()会直接取出不被执行
d、相同的是都在最后尝试将线程池推到TERMINATED状态。
<pre>
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); //check 是否有workers中的线程shutdown权限
advanceRunState(SHUTDOWN); //将线程池状态变成SHUTDOWN,同时不受理新的工作线程
interruptIdleWorkers();//中断状态不为isInterrupted()且空闲的线程
onShutdown(); // hook for ScheduledThreadPoolExecutor,只有ScheduledThreadPoolExecutor的实现中cancel delayed tasks
} finally {
mainLock.unlock();
}
tryTerminate();
}
</pre>
<pre>
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers(); //中断所有正在工作的线程
tasks = drainQueue(); //取出所有workQueue中等待的任务
} finally {
mainLock.unlock();
}
tryTerminate(); //最后检查线程池工作线程数是否为0,并尝试切换到TERMINATED状态。
return tasks;
}
drainQueue()中drainTo方法我从没用过:
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
翻了一下源代码:其意思为加了个ReentrantLock锁线程安全的把workQueue中的线程copy到taskList
</pre>
<h4>3、预留扩展</h4>
<pre>
预留扩展:
beforeExecute() 在每个任务执行前做的处理
protected void beforeExecute(Thread t, Runnable r) { }
afterExecute() 在每个任务执行后做的处理
protected void afterExecute(Runnable r, Throwable t) { }
terminated() 在ThreadPoolExecutor到达TERMINATED状态前所做的处理
protected void terminated() { }
finalize() 有默认实现,直接调用shutdown(),以保证线程池对象回收
protected void finalize() {
shutdown();
}
onShutdown() 在shutdown()方法执行到最后时调用,在java.util.concurrent.ScheduledThreadPoolExecutor类实现中用到了这个扩展点,做一些任务队列的清理操作。
void onShutdown() {
}
</pre>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,008评论 25 707
  • 前段时间遇到这样一个问题,有人问微信朋友圈的上传图片的功能怎么做才能让用户的等待时间较短,比如说一下上传9张图片,...
    加油码农阅读 1,197评论 0 2
  • 文/向光的小虫 下班后在宿舍加了一会班,脖子僵硬到呻吟,才发现已经坐了太久,在泰国的时候我每周去一次泰式按摩,听说...
    向光的小虫阅读 435评论 0 2
  • 曾送你生命里最用心又最笨拙的礼物;曾做过生命里最可笑又最心酸的举止;曾有过生命里最绝望又最坚毅的委屈。 我注视着你...
    郁南枫阅读 151评论 0 0
  • 陆林颖阅读 150评论 0 0