juc系列-Executor框架

什么是线程池

线程池:管理一组工作线程的资源池。

为什么使用线程池

1.避免反复创建回收线程,降低资源消耗。
2.提供线程的可管理性。
3.提高响应速度

如何创建线程池

ThreadPoolExecutor是jdk提供的线程池的服务类,基于ThreadPoolExecutor可以很容易将一个实现Runnable接口的任务放入线程池中执行,下面是ThreadPoolExecutor实现:

    //构造函数:
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

创建一个线程池需要以下几个参数:

  • corePoolSize:线程池中线程的个数。
  • maximumPoolSize:线程池最大容量。
  • keepAliveTime:线程池中空闲存活时间(单位:TimeUnit定)
  • workQueue:存放任务的工作队列。
  • threadFactory:线程工厂
  • handler:当任务数超过maximumPoolSize+corePoolSize后,任务交给handler处理。
1.corePoolSize

当客户端提交一个新任务时,如果此时线程池中线程的个数小于corePoolSize,则线程池会新建一个线程来处理该任务,即时此时有空闲线程。

2.maximumPoolSize

客户端提交新任务,此时线程池的任务队列也已经满了,那么如果maximumPoolSize < corePoolSize,就新建线程执行该任务。

如果线程池用的是无界队列,那么这个参数也就不起任何作用了。

3.keepAliveTime

线程池中超过corePoolSize的线程会在空闲keepAliveTime时间后被关闭,keepAliveTime单位由TimeUnit指定,如果allowCoreThreadTimeOut=true,即核心线程如果空闲时间超过keepAliveTime时间后同样会被关闭。

4.workQueue

当核心线程池已满,即当前worker数=corePoolSize时,新提交的任务会被放入工作队列中。此时一旦有worker完成手头的任务就会到workQueue中领取一个新任务继续执行。
工作队列可以有以下几种选择:

(1).ArrayBlockingQueue:基于数组的有界阻塞队列
(2).LinkedBlockingQueue:基于链表的阻塞队列,可以不指定队列大小,默认Integer.MAX_VALUE。性能高于ArrayBlockingQueue。
(3).SynchronousQueue
(4).PriorityBlockingQueue:具有优先级的阻塞列,基于数组实现,内部实际上实现了一个最小堆,每次offer、poll,都需要进行堆调整操作O(logn)。队列中元素需要实现Comparable接口或初始化队列时传入一个Comparator对象。虽然初始化队列时需指定队列大小,但PrioriityBlockingQueue支持动态扩容,所以可以认为是无限阻塞队列。
5.threadFactory

每当线程池需要创建一个线程时,都是通过线程工厂方法来完成。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。
我们可以通过实现ThreadFactory接口,来定制线程工厂。样例如下:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}
public class MyThreadFactory implements ThreadFactory {

    private final String poolName;

    public MyThreadFactory(String name){
        this.poolName = name;
    }
    @Override
    public Thread newThread(Runnable r) {
        // TODO Auto-generated method stub
        return new MyAppThread(r,poolName);
    }
}

public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();

    private static final AtomicInteger alive = new AtomicInteger();

    public MyAppThread(Runnable r){
        this(r,DEFAULT_NAME);
    }
    public MyAppThread(Runnable r,String name){
        super(r,name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("UNCATGHT in Thread " + t.getName());
            }
        });
    }
    public void run(){
        try {
            alive.incrementAndGet();
            super.run();
        } finally{
            alive.decrementAndGet();
        }
    }
    public static int getThreadsCreated(){return created.get();}
    public static int getThreadAlive(){return alive.get();}
    public static boolean getDebug(){return debugLifecycle;}
}    
6.RejectedExecutionHandler

当线程池达到最大容量,饱和策略就发挥作用,ThreadPoolExecutor的饱和策略可以通过setRejectedExecutionHandler方法来修改。
JDK提供几种不同的RejectedExecutionHandler的实现:

  • AbortPolicy:终止策略是默认饱和策略,直接抛出RejectedExecutionException。调用者可以捕获这个异常,并做相应处理。
  • CallerRunsPolicy:“调用者运行”策略实现了一种调用机制,该策略不会抛弃任务,也不抛出异常,而是将任务退回调用者,从而降低新任务的流量
  • DiscardPolicy:新任务无法入队列时,直接抛弃该任务。
  • DiscardOldestPolicy:抛弃下一个将要被执行的任务,然后尝试重新提交新任务。(若工作队列是一个优先队列,那么“抛弃最旧的”策略导致抛弃优先级最高的任务。)

饱和策略实现类都需要实现RejectedExecutionHandler接口,下面是四种jdk内置的四种饱和策略的源码:

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        // 直接抛出RejectedExecutionException异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    
public static class DiscardPolicy implements RejectedExecutionHandler     {
        public DiscardPolicy() { }
        //不做操作,直接丢弃了
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        //抛弃队头的任务,即最早提交且未执行的任务
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }    

创建线程池

jdk为我们提供了一个工厂类Executors,其中提供了几个静态工厂方法用于新建不同特性的线程池。如下:

public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }    
  • newFixedThreadPool:将创建一个固定长度的线程池,每提交一个任务时就创建一个线程,直到达到线程池的最大数量,此时线程池的规模不再变化(如果某个线程发生Exception而结束,那么线程池会补充一个新的线程)

  • newCachedThreadPool:创建一个可缓存的线程池,如果当前线程池中线程的个数超过了处理需求时,那么空闲线程将被回收,而当需要增加线程时,则可以添加新的线程,线程池中个数不受限制(使用时格外注意,防止内存溢出)

  • newSingleThreadPool:这是一个单线程的Executor,它创建单个工作线程来执行任务,如果线程异常结束,会创建一个新的线程来替代。newSingleThreadPool能确保依照任务在队列中的顺序串行执行。

  • newScheduledThreadPool:创建一个固定长度的线程池,而且可以延时或定时方式来执行任务。


注意事项

1.newFixedThreadPool和newSingleThreadExecutor都是用了LinkedBlockingQueue(),默认capacity=Integer.MAX_VALUE,线程池工作队列可以认为是无限大的,所以线程池中的线程数不会超过CorePoolSize,maximumPoolSize可以认为是一个无效参数,且饱和策略不可能执行,这几点需要注意。

2.newFixedThreadPool(1)和newSingleThreadPool区别?

newSingleThreadPool返回的是一个代理对象,屏蔽了ThreadPoolExecutor的一些set方法,即newSingleThreadPool一旦返回,就无法在重新配置线程池的参数了。

3.CachedThreadPool的corePoolSize=0,即核心线程池默认为空,maximumPoolSize=Integer.MAX_VALUE,最大线程池为无限大的。且空闲线程等待新任务超过60秒即被终止。


Executor生命周期

由于Executor以异步方式来执行任务,因此在任意时刻,之前提交的任务的状态是无法立刻得到的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。

为了解决执行任务的生命周期问题,ExecutorService扩展了Executor接口,添加了一些生命周期管理的方法。如下:

void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
//  ...还有用于提交任务的一些方法

线程池的生命周期有以下几种状态

  • RUNNING:接受新task,并且处理工作队列中的任务。
  • SHUTDOWN:不接受新task,但是继续处理工作队列中的任务。
  • STOP:不接受新task,不处理工作队列中的任务,并且中断运行中的线程。
  • TIDYING:所有任务已被终止,线程池已清空(workerCount=0),此时线程池状态变为TIDYING,并且准备执行terminated()方法。
  • TERMINATED:以完成terminated()方法。

线程池如何存储自身状态的?

线程池的状态信息是用一个AtomicInteger类型的变量ctl存储的,定义如下:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl中除了存放状态信息,还存放了线程池当前工作线程的个数信息。下图展示这两个信息在ctl中的存储形式:


下面是状态相关信息的源码,结合上图应该就不难理解了


    private static final int COUNT_BITS = Integer.SIZE - 3;//为啥减3
    //0-28位全为1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // RUNNING :  111。。。
    private static final int RUNNING    = -1 << COUNT_BITS;
    // SHUTDOWN : 000。。。
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP     : 001。。。
    private static final int STOP       =  1 << COUNT_BITS;
    // TIDYING :  010。。。
    private static final int TIDYING    =  2 << COUNT_BITS;
    //TERMINATED :110。。。
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 通过简单的位运算获取ctl中的信息
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

runState有5种状态,所以最少需要3bit来表示。这就是为什么COUNT_BITS=32-3

各种状态之间的转换时机:

  • RUNNING -> SHUTDOWN:调用shutdown()方法
  • (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()
  • SHUTDOWN -> TIDYING:线程池和工作队列都为空时
  • STOP -> TIDYING:线程池为空时
  • TIDYING -> TERMINATED:调用terminated()后

关闭线程池方法

  • shutdown():不再接受新任务,同时已提交的任务执行完成,包括那些还在队列中等待,未开始执行的任务。
  • shutdownNow():将取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。之后再提交任务则抛出异常:java.util.concurrent.RejectedExecutionException。

线程池工作流程

线程池处理新提交任务的流程如下:


  1. 如果当前运行的线程数小于配置的corePoolSize,就新建一个线程执行该command任务,即时此时线程池中有空闲线程。

  2. 如果线程池中线程个数达到corePoolSize,新提交的任务就被放入workQueue,等待线程池任务调度。

  3. 当workQueue满后,且maximumPoolSize > corePoolSize,新提交任务会创建新线程执行任务

  4. 当workQueue满后,且线程池个数达到maximumPoolSize,则提交的任务交由RejectedExecutionHandler处理。

  5. 超过corePoolSize的线程,在空闲keepAliveTime时间后,将被关闭。

  6. 当allowCoreThreadTimeOut=true时,corePoolSize个数内的线程空闲时间达到keepAliveTime后,也将被关闭。

ThreadPoolExecutor 中execute源码:如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//新建线程执行任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//提交到工作队列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//
            reject(command);
    }

线程池监控

//线程池已完成任务数量
private long completedTaskCount;
//当前运行的线程数
public int getActiveCount()    

此外,ThreadPoolExecutor还提供了以下几个钩子函数用于扩展它的行为,我们可以在子类中实现自己的逻辑,在每个任务执行的前、后以及worker退出时进行定制处理。

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

线程的生命周期

下面从线程池中的某个线程的角度出发,分析一下线程从被创建一直到被销毁,整个生命周期里的工作流程

1. 线程的创建时机
  • 提交任务时被创建(即客户端调用submit方法)。不过提交任务未必一定会创建线程,这在前面线程池的工作流程里已经提到。
  • 预先启动线程池中核心线程池。(如:调用prestartCoreThread、prestartAllCoreThreads()等方法),下面是prestartAllCoreThreads的源码
public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))//firstTask为null
            ++n;
        return n;
}
2. 线程在线程池中的数据结构

线程池中的工作线程是被封装到一个Worker类中,部分源码如下:

private final class Worker  extends AbstractQueuedSynchronizer
        implements Runnable
    {

        /** worker关联的线程 */
        final Thread thread;
        /** worker的第一个任务,可能为null */
        Runnable firstTask;
        /** worker完成的任务数量 */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** 将任务代理给runWorker方法  */
        public void run() {
            runWorker(this);
        }
        。。。。
    }
    
//下面是ThreadPoolExecutor中的runWorker方法源码:
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//钩子函数
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//钩子函数
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

当firstTassk为null的情况下,线程的执行流程如下图


对于Executor框架,需要明白以下两点

Executor框架基于生产者-消费者模式:提交任务的执行者是生成者,执行任务的线程是消费者。

Executor是异步执行任务,这是通过队列来实现的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容