Java并发编程中由浅入深理解线程池

线程池的提出

在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。因为频繁创建线程和销毁线程需要时间。线程本身也要占用内存空间,大量的线程会占用内存资源并且可能会导致
Out of Memory。大量的线程回收也会给GC带来很大的压力。
在Java中内存资源是及其宝贵的,所以就提出了线程池的概念。

线程池简介

Java中开辟出了一种管理线程的概念,线程池的好处就是可以方便的管理线程,也可以减少内存的消耗。为了避免重复的创建线程,线程池的出现可以让线程进行复用。通俗点讲,当有工作来,就会向线程池拿一个线程,当工作完成后,并不是直接关闭线程,而是将这个线程归还给线程池供其他任务使用。

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

创建线程

之前也说过创建线程的三种方式

1.继承Thread类,重写run()

public class Th1 extends Thread{
    @Override
    public void run() {
        System.out.println("Th1");
    }
}

2.实现Runable接口,重写run()

public class Th2 implements Runnable{
    @Override
    public void run() {
        System.out.println("Th2");
    }
}

3.实现Callable接口,重写call()

public class Th3 implements Callable {
    @Override
    public Object call() throws Exception {
        System.out.println("th3");
        String th="Th3";
        return th;
    }
}

传统的启动线程的方法

实现Callable接口的线程需要配合FutureTask来使用,RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

public class thMain {
    public static void main(String[] arg) {
        Th1 th1=new Th1();
        Th2 th2=new Th2();
        Th3 th3=new Th3();
        Thread thread1=new Thread(th1);
        Thread thread2=new Thread(th2);
        FutureTask futureTask=new FutureTask(th3);
        Thread thread3=new Thread(futureTask);
        thread1.start();
        thread2.start();
        thread3.start();
    }
}
这里可能会有个问题,为什么不直接调用run(),而是start()

就是为了实现多线程的优点,没这个start()不行。直接调用run()就直接安排好了顺序,调用start()后,线程会被放到等待队列,等待CPU调度,并不一定要马上开始执行,只是将这个线程置于就绪状态。然后通过JVM,线程Thread会调用run(),执行本线程的线程体。

线程的生命周期图解

引入线程池

应用场景:

一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
T1 + T3 远大于T2,则可以采用线程池,以提高服务器性能。

使用线程池的好处

减少线程创建和销毁的时间。以上例来说,线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。T1T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1T3的开销了。
显著减少了创建线程的数目。一个服务器一天要处理n个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为n。利用线程池的服务器程序不会为了创建n个线程而在处理请求时浪费时间,从而提高效率。(n>>>线程池的数目)

首先来了解下线程池的流程

image.png

任务进来时,wonrkerCountOf()能够取得当前线程池中的线程的总数,取得当前线程数与核心池大小比较,
1.如果小于,将通过addWorker()调度执行。
2.如果大于核心池大小,那么就提交到等待队列,等待执行
3.如果进入等待队列失败,则会将任务直接提交给线程池。
4.如果线程数达到最大线程数,那么就提交失败,就调用handler实现拒绝策略。

线程池的类图

image.png

Executor只是一个接口,它是Java线程池框架的基础,它将任务的提交与任务的执行分离开来。
ExecutorService继承自Executor,有两个关键类实现了ExecutorService接口:
ThreadPoolExecutorScheduledThreadPoolExecutor
ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
ScheduledThreadPoolExecutor 也是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。它比Timer更灵活,功能更强大(继承了ThreadPoolExecutor类实现了
ScheduledExecutorService接口)

Executors提供的线程服务,都是通过参数设置来实现不同的线程池机制。
ThreadPoolExecutor构造方法参数讲解

参数名 作用 类型
corePoolSize 线程池的基本大小,包括空闲线程 int
maximumPoolSize 池中允许的最大线程数 int
keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间 long
TimeUnit keepAliveTime时间单位 TimeUnit
workQueue 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务 BlockingQueue<Runnable>
threadFactory 执行程序创建新线程时使用的工厂 ThreadFactory
handler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理,由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 RejectedExecutionHandler

poolSize:线程池中当前线程的数量。
当新提交一个任务时:
(1)如果poolSize<corePoolSize,新增加一个线程处理新的任务。
(2)如果poolSize=corePoolSize,新任务会被放入阻塞队列等待。
(3)如果阻塞队列的容量达到上限,且这时poolSize<maximumPoolSize,新增线程来处理任务。
(4)如果阻塞队列满了,且poolSize=maximumPoolSize,那么线程池已经达到极限,会根据饱和拒绝策略RejectedExecutionHandler拒绝新的任务。

创建一个线程池(使用默认的线程工厂)

ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,new SynchronousQueue<>(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

相关说明

TimeUnitjava.util.concurrent包下面的一个类,表示给定单元粒度的时间段

TimeUnit.DAYS          //天  
TimeUnit.HOURS         //小时  
TimeUnit.MINUTES       //分钟  
TimeUnit.SECONDS       //秒  
TimeUnit.MILLISECONDS  //毫秒 
TimeUnit.NANOSECONDS   //毫微秒
TimeUnit.MICROSECONDS  //微秒

ThreadFactory默认使用Executors.defaultThreadFactory()有需要的可以实现ThreadFactory接口进行重写,DefaultThreadFactoryExecutors类中的一个内部类

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

拒绝策略RejectedExecutionHandler,是ThreadPoolExecutor的内部类,都实现了
RejectedExecutionHandler接口,源码如下

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

线程池的组成

线程池包括以下四个基本组成部分:
1.ThreadPool(线程池管理器):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务
2.PoolWorker(工作线程):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务
3.Task(任务接口):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等
4.TaskQueue(任务队列):用于存放没有处理的任务。提供一种缓冲机制

常见的线程池

1.newCachedThreadPool(推荐使用):可缓存线程池,无界线程池,可以进行自动线程回收。该线程池中没有核心线程,非核心线程的数量为无限大Integer.max_value,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行。适用于耗时少,任务量大的情况。
2.newScheduledThreadPool:周期性执行任务的线程池,按照某种特定的计划执行线程中的任务,有核心线程,但也有非核心线程,非核心线程的大小也为无限大。适用于执行周期性的任务。
3.newSingleThreadPool:单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务
4.newFixedThreadPool:固定数量的线程池,每提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行

任务缓存队列及排队策略

任务缓存队列workQueue,用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>
1.ArrayBlockingQueue(有界队列):用数组实现的有界阻塞队列,其内部按先进先出的原则对元素进行排序,通过重入锁ReenterLockCondition条件队列实现的,此队列创建时必须指定大小,最大的特点便是可以防止资源耗尽的情况发生。
2.LinkedBlockingQueue(无界队列):基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE,在正常情况下,链接队列的吞吐量要高于基于数组的队列ArrayBlockingQueue,因为其内部实现添加和删除操作使用的两个ReenterLock来控制并发执行,而ArrayBlockingQueue内部只是使用一个ReenterLock控制并发
3.SynchronousQueue(直接提交):一个不存储元素的队列,这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务,通常要求maximumPoolSize是无界的。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于ArrayBlockingQueueLinkedBlockingQueue

Handler的拒绝策略:

1.AbortPolicy:不执行新任务,直接抛出RejectedExecutionException异常,提示线程池已满
2.DisCardPolicy:不执行新任务,也不抛出异常
3.DisCardOldSetPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
4.CallerRunsPolicy:直接调用execute()来执行当前任务

代码实现

使用newCachedThreadPool线程池
public class thMain {
    public static void main(String[] arg) throws Exception{
        Th1 th1=new Th1();
        Th2 th2=new Th2();
        Th3 th3=new Th3();
        Thread thread1=new Thread(th1);
        Thread thread2=new Thread(th2);
        FutureTask futureTask=new FutureTask(th3);
        Thread thread3=new Thread(futureTask);
        ExecutorService executorService= Executors.newCachedThreadPool();
        executorService.execute(thread1);
        executorService.execute(thread2);
        executorService.execute(thread3);
        executorService.shutdown();
    }
}

使用newFixedThreadPool线程池
ExecutorService executorService= Executors.newFixedThreadPool(20);

使用newSingleThreadPool线程池(单线程的,不推荐)
ExecutorService executorService= Executors.newSingleThreadExecutor();

使用newScheduledThreadPool线程池
ExecutorService executorService= Executors.newScheduledThreadPool(20);

线程池的关闭

ThreadPoolExecutor提供了两个方法用于线程池的关闭
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

关于合理设置线程池大小

公式:最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 210,914评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,935评论 2 383
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,531评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,309评论 1 282
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,381评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,730评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,882评论 3 404
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,643评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,095评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,448评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,566评论 1 339
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,253评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,829评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,715评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,945评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,248评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,440评论 2 348

推荐阅读更多精彩内容

  • 最近我参加了一次面试,面试官要求用python实现某个api,一部分代码如下 面试官说:“ data = []这一...
    大蟒传奇阅读 19,351评论 2 26
  • 平时我在单位写总结材料,被领导批评最多的一句话就是“站位,站位一定要高,你要站在全局的角度来写,而不能只局限在自身...
    刘金玉阅读 150评论 0 3