线程池任务执行过程

前言

JAVA通过多线程的方式实现并发,为了方便线程池的管理,JAVA采用线程池的方式对线线程的整个生命周期进行管理。1.5后引入的Executor框架的最大优点是把任务的提交和执行解耦。

要执行任务的人只需把Task描述清楚,然后提交即可。这个Task是怎么被执行的,被谁执行的,什么时候执行的,提交的人就不用关心了。

具体点讲,提交一个Callable对象给ExecutorService(如最常用的线程池ThreadPoolExecutor),将得到一个Future对象,调用Future对象的get方法等待执行结果就好了。

一个简单的例子

// 一个有5个作业线程的线程池,
//老大的老大找到一个管5个人的小团队的老大
   ExecutorService executorService = Executors.newFixedThreadPool(5);
     //提交作业给老大,作业内容封装在Callable中,约定好了输出的类型是String。
        String outputs = executorService.submit(
                 new Callable<String>() {
                     public String call() throws Exception {
                         return "I am a task";
                     }
                     //提交后就等着结果吧,到底是手下5个作业中谁领到任务了,老大是不关心的。
                 }).get();

        System.out.println(outputs);

使用上非常简单,通过一个工场类Executors创建了一个工作类,工场类返回一个ExecutorService对象。

执行过程

任务提交

ExecutorService是一个接口,没有具体实现,最后的具体实现应该由ThreadPoolExecutor实现的。

Executor 定义了一个execute接口,ExecutorService继承了Executor,并定义了管理线程生命周期的接口,可以接受提交任务、执行任务、关闭服务。

抽象类AbstractExecutorService 实现了ExecutorService接口,也实现了接口定义的默认行为;ThreadPoolExecutor继承了AbstractExecutorService。

AbstractExecutorService任务提交的submit方法有三个实现。

  1. 接收一个Runnable的Task,没有执行结果;

     public Future<?> submit(Runnable task) {
         if (task == null) throw new NullPointerException();
         RunnableFuture<Void> ftask = newTaskFor(task, null);
         execute(ftask);
         return ftask;
     }
    
  2. 接收两个参数:一个任务,一个执行结果;

     public <T> Future<T> submit(Runnable task, T result) {
         if (task == null) throw new NullPointerException();
         RunnableFuture<T> ftask = newTaskFor(task, result);
         execute(ftask);
         return ftask;
     }
    
  3. 接收一个Callable,本身就包含执任务内容和执行结果。

     public <T> Future<T> submit(Callable<T> task) {
         if (task == null) throw new NullPointerException();
         RunnableFuture<T> ftask = newTaskFor(task);
         execute(ftask);
         return ftask;
     }
    

submit方法的返回结果是Future类型,调用该接口定义的get方法即可获得执行结果。 V get() 方法的返回值类型V是在提交任务时就约定好了的。

分析
  1. 看AbstractExecutorService中submit(Callable<T> task),构造好一个FutureTask对象后,调用execute()方法执行任务。

    public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
    }

  2. Submit传入的参数都被封装成了FutureTask类型来execute的,对应前面三个不同的参数类型都会封装成FutureTask。

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
    }

  3. Executor接口中定义的void execute(Runnable command)方法的作用就是执行提交的任务,该方法在抽象类AbstractExecutorService的子类ThreadPoolExecutor中实现。

一个任务的执行过程

execute

先补充下ThreadPoolExecutor有两个最重要的集合属性,分别是存储接收任务的任务队列和用来干活的作业集合。

//任务队列
private final BlockingQueue<Runnable> workQueue;
//作业线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();

execute源码分析

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. 如果当前正在执行的Worker数量比corePoolSize(核心线程大小)要小。
     * 直接创建一个新的Worker执行任务,会调用addWorker方法
     *
     * 2. 如果当前正在执行的Worker数量大于等于corePoolSize(核心线程大小)。
     * 将任务放到阻塞队列里,如果阻塞队列没满并且状态是RUNNING的话,直接丢到阻塞队列,否则执行第3步。
     * 丢到阻塞队列之后,还需要再做一次验证(丢到阻塞队列之后可能另外一个线程关闭了线程池或者刚刚加入到队列的线程死了)。
     * 如果这个时候线程池不在RUNNING状态,把刚刚丢入队列的任务remove掉,调用reject方法,
     * 否则查看Worker数量,如果Worker数量为0,起一个新的Worker去阻塞队列里拿任务执行
     *
     * 3. 丢到阻塞失败的话,会调用addWorker方法尝试起一个新的Worker去阻塞队列拿任务并执行任务,
     * 如果这个新的Worker创建失败,调用reject方法
     */
    int c = ctl.get();

    // 第一个步骤,满足线程池中的线程大小比核心线程大小要小
    if (workerCountOf(c) < corePoolSize) {

        // addWorker方法第二个参数true表示使用基本大小
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 第二个步骤,线程池的线程大小比核心线程大小要大,
    // 并且线程池还在RUNNING状态,阻塞队列也没满的情况,加阻塞队列里
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();

    // 虽然满足了第二个步骤,但是这个时候可能突然线程池关闭了,所以再做一层判断
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }

    // 第三个步骤,直接使用线程池最大大小。addWorker方法第二个参数false表示使用最大大小
    else if (!addWorker(command, false))
        reject(command);
}

在前面方法中都会调用addWorker(Runnable firstTask, boolean core)方法创建一个工作线程,差别是创建的有些工作线程上面关联接收到的任务firstTask,有些没有。该方法为当前接收到的任务firstTask创建Worker,并将Worker添加到作业集合HashSet<Worker> workers中,并启动作业。

addWorker源码分析

// 返回值是boolean类型,true表示新任务被接收了,并且执行了。否则是false

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c); // 线程池当前状态

        // 这个判断转换成 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)。 
        // 概括为3个条件:
        // 1. 线程池不在RUNNING状态并且状态是STOP、TIDYING或TERMINATED中的任意一种状态

        // 2. 线程池不在RUNNING状态,线程池接受了新的任务 

        // 3. 线程池不在RUNNING状态,阻塞队列为空。  满足这3个条件中的任意一个的话,拒绝执行任务

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c); // 线程池线程个数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) // 如果线程池线程数量超过线程池最大容量或者线程数量超过了基本大小(core参数为true,core参数为false的话判断超过最大大小)
                return false; // 超过直接返回false
            if (compareAndIncrementWorkerCount(c)) // 没有超过各种大小的话,cas操作线程池线程数量+1,cas成功的话跳出循环
                break retry;
            c = ctl.get();  // 重新检查状态
            if (runStateOf(c) != rs) // 如果状态改变了,重新循环操作
            continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 参数 firstTask != null, core = true
    // 验证可以满足可新增线程的条件
    boolean workerStarted = false; // 任务是否成功启动标识
    boolean workerAdded = false; // 任务是否添加成功标识
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock; // 得到线程池的可重入锁
        w = new Worker(firstTask); // 基于任务firstTask构造worker
        final Thread t = w.thread; 
        if (t != null) { 
            mainLock.lock(); // 锁住,防止并发
            try {
                // 在锁住之后再重新检测一下状态
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) { // 如果线程池在RUNNING状态或者线程池在SHUTDOWN状态并且任务是个null
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException(); 
                        workers.add(w); 
                    int s = workers.size(); 
                    if (s > largestPoolSize) 
                        largestPoolSize = s;
                    workerAdded = true; // 标识一下任务已经添加成功
                }
            } finally {
                mainLock.unlock(); // 解锁
            }
            if (workerAdded) { 
                t.start(); // 线程启动,调用worker.run
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted) // 任务启动失败
            addWorkerFailed(w);
    }
    return workerStarted;
}

任务执行

private boolean addWorker(Runnable firstTask, boolean core) {
            ...
            ...
    
            if (workerAdded) { 
                t.start(); // 线程启动,调用worker.run
                workerStarted = true;
            }
            ...
    return workerStarted;
}

Worker中的线程start的时候,调用Worker本身run方法

run()内部调用runWorker(Worker w),就是在一直调用getTask()方法获取任务,然后调用 runTask(task)方法执行任务。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    ...
    ...
        //循环从线程池的任务队列获取任务
        while (task != null || (task = getTask()) != null) {
                    ...
                    ...
                try {
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                            ...
                            ...
                } finally {
                    //执行正常完成
                    afterExecute(task, thrown);
                }
        }finally {
            //调用processWorkerExit方法进行回收
            processWorkerExit(w, completedAbruptly);
        }
           
}

getTask()方法是ThreadPoolExecutor提供给其内部类Worker的的方法。作用就是一个,从任务队列中取任务,源源不断地输出任务。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
       ...
    for (;;) {
       ...

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                //从任务队列的头部取任务
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

如果发生了以下四件事中的任意一件,那么Worker需要被回收:

  1. Worker个数比线程池最大大小要大

  2. 线程池处于STOP状态

  3. 线程池处于SHUTDOWN状态并且阻塞队列为空

  4. 使用超时时间从阻塞队列里拿数据,并且超时之后没有拿到数据(allowCoreThreadTimeOut || workerCount > corePoolSize)

如果getTask返回的是null,那说明阻塞队列已经没有任务了,那么会调用processWorkerExit方法进行回收:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 集合移除掉需要回收的Worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 尝试结束线程池
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        //如果Worker不是异常而死亡结束流程
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //异常结束,新开启一个Worker
        //开启条件:由于用户任务异常退出; Worker任务少于corePoolSize或者工作正在运行;阻塞队列不为空,但没有workers。
        addWorker(null, false);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容