线程池 ExecutorService


一、类型

并发,启动大量任务线程时,频繁的线程创建/销毁会造成浪费。(创建、运行、销毁都需要时间),采用线程池,线程复用技术,提高性能。

线程池实现类,ThreadPoolExecutor 类。

ThreadPoolExecutor 继承关系

ThreadPoolExecutor 构造方法,实现不同类型线程池。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

corePoolSize,核心线程数。
maximumPoolSize,允许的最大线程,超过报异常。
keepAliveTime,非核心线程活跃时间。
TimeUnit,时间度量。
BlockingQueue<Runnable>,任务队列,(无限、有限队列、栈)。
ThreadFactory,线程工厂。

不推荐直接使用 ThreadPoolExecutor 构造方法创建。

Executors 类,静态方法创建。

newCachedThreadPool()
newFixedThreadPool()
newSingleThreadExecutor()
newScheduledThreadPool()

index 核心线程 最大线程 活跃时间 队列
缓存 0 MAX_VALUE 60s SynchronousQueue
固定数量 自定义 自定义 0L LinkedBlockingQueue
单线程 1 1 0L LinkedBlockingQueue
定时任务 自定义 MAX_VALUE 10L DelayedWorkQueue

1,缓存

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

核心线程是0,最大线程 MAX_VALUE ,任务队列 SynchronousQueue 是一个管道,不存储,线程活跃时间60秒。
适用短期大量任务的场景。
某一时间段高并发任务,创建大量线程,任务结束后,线程变空闲,60s以内重用,处理新任务,空闲到达60s,销毁。

2,固定数量

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory 
                    threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
}

核心线程、最大线程,自定义,LinkedBlockingQueue 任务队列,不设置空闲时间,无界队列。
适用长期任务场景。
任务队列无界,仅核心线程工作,keepAlieveTime 不需要设置。

3,单线程

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

核心线程、最大线程,数量1,LinkedBlockingQueue 任务队列,不设置空闲时间,无界队列。
适用单个任务顺序执行场景。
只有一个核心线程,新任务加入队列。

4,定时任务

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
}

ScheduledThreadPoolExecutor 实例,ThreadPoolExecutor 子类。
核心线程自定义,最大线程 MAX_VALUE,DelayedWorkQueue 任务队列,空闲时间10s。
适用指定有延迟的任务或周期性重复任务场景。
队列不是按照 submit 时间排序,以延时时间为优先级升序排序,延时时间短的,优先级高,先执行。

任务控制流程

  • 线程数量 < corePoolSize,新建线程,处理任务。
  • 线程数量 >= corePoolSize,加入任务队列,有核心线程空闲时,从队列获取任务,处理。
  • 队列有限且已满,再次新建工作线程处理新增任务,不超过最大线程数量
  • 工作线程,空闲时时间 keepAliveTime 。

二、工作原理

线程池 ThreadPoolExecutor 创建,将任务派发给线程池,execute() 方法,自动分配线程执行。

线程池 工作原理
public void execute(Runnable command) {
    if (command == null)//任务不能空
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {//小于核心线程
        if (addWorker(command, true))//创建核心线程
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {//加入队列
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))//队列满了,创建非核心临时线程
        reject(command);
}

workerCountOf(c) 方法,判断工作线程数量,当 <核心线程数量,addWorker() 方法,创建线程,设标志位参数代表核心线程,任务将由新建核心线程处理。
当 >= 核心线程,isRunning(c) 方法,表示 c<0,offer()方法,将任务加入队列。
offer() 是非阻塞方法,如果队列已满,返回失败,此时,addWorker() 方法,创建线程,不设标志位参数代表非核心线程,任务将由新建的非核心线程处理。

新线程创建
如果工作线程 >CAPACITY 容量,或 >= 允许的最大值(创建核心线程 >= 核心线程数量),失败返回。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        ...
        for (;;) {
            int wc = workerCountOf(c);//工作线程数量
            if (wc >= CAPACITY ||// 已经等于最大值。
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建线程,绑定任务。
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();//上锁
            try {
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);//任务放入集合
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; //后续根据此标志启动线程。
                }
            } finally {
                mainLock.unlock();//解锁
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

创建一个 Runnable 类型 Worker 对象,ThreadPoolExecutor 内部静态类,用户任务封装,newThread() 方法,创建新线程,将 Worker(this) 作为新线程任务主体。

Worker(Runnable firstTask) {
    setState(-1);
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
Worker 任务

将 Worker 任务加入 HashSet 集合,设置 workerAdded 标志,启动新线程( start方法),设置 workerStarted 启动标志,代表线程启动,执行 Worker 任务的 run() 方法,调用 runWorker() 方法(外部类方法)。

final void runWorker(Worker w) {
    //处理Worker内的派发任务,在循环中进一步访问任务队列。
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    ...
    try {
        while (task != null || (task = getTask()) != null) {
            ....
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();//执行任务
                } 
                ...
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

新线程执行 Worker 任务的 run() 方法,借助 Worker 开始为线程池工作,从 Worker 获取内部 Runnable(即 execute 派送的用户任务),并执行 run() 方法。
用户任务处理完成,线程不会马上结束,while 循环,继续 getTask() 方法,从任务队列中获取任务,该方法可能会导致阻塞,队列空则线程结束。

BlockingQueue 阻塞队列。

private Runnable getTask() {
    boolean timedOut = false; 
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        ...//队列空时,返回null。
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://阻塞一段时间
                    workQueue.take();//一直阻塞
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

1,工作线程 wc > 核心线程
设置 timed 标志,队列采用阻塞等待,(poll + timeout方式),timeout 设置线程 keepAliveTime 时间 。
因此,即使队列没有任务,线程仍然存活,(任务进队列后可立即唤醒展开工作)。

2,工作线程 wc < 核心线程
(仅有核心线程),队列采用一直阻塞,( take 方式),队列是空时,线程一直阻塞,核心线程不会结束。

3,队列空,poll 超时
设置 timeOut 已超时标志,下次循环时,如果再发生工作线程 wc > 核心线程 ( timed 和 timedOut 标志并存),线程数量自减,退出 while 返回空,线程结束。

4,设置核心线程 allowCoreThreadTimeOut
不管工作线程 wc,采用 poll + timeout 方式,keepAliveTime 队列无任务,所有线程都会 timedOut 超时标志,下次循环自动结束。
即使 wc < 核心线程,线程也会结束退出,允许核心线程超时结束。

该方法对每一个线程的算法相同,根据当前工作线程数量和超时标志,决定从队列获取任务时阻塞状态。

三、总结

线程池本质,设置一定数量的线程集合,任务结束,但线程不结束,根据设定值,请求任务队列,继续任务,复用线程。

线程等待,利用任务队列的阻塞特性。阻塞任务队列,访问空队列时,线程等待,timeout 超时时间,实现线程 keepAliveTime 活动或一直存活。

任务队列
poll() 方法,取队列,非阻塞,poll + timeout,阻塞 timeout 时间。
take() 方法,取队列,阻塞等待。
put() 方法,存队列,阻塞等待,
offer() 方法,存队列,非阻塞,offer + timeout,阻塞 timeout 时间。

核心线程,在线程集合中,未具体标明,若 All 线程都完成任务,空闲,且队列空,在 getTask() 方法,根据超时时间,逐一唤醒,结束,剩下的数量 = 核心,它们即核心。


任重而道远

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,265评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,078评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,852评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,408评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,445评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,772评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,921评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,688评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,130评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,467评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,617评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,276评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,882评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,740评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,967评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,315评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,486评论 2 348