多线程与并发(九):线程池相关

为什么说频繁的创建和销毁线程会浪费大量的系统资源?

线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间。在线程销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源

线程池的作用

  • 利用线程池管理并复用线程、控制最大并发数等
  • 实现任务线程队列缓存策略和拒绝机制
  • 实现某些与时间相关的功能,如定时执行、周期执行等
  • 隔离线程环境。通过配置独立的线程池,将一些服务隔开,避免个服务相互影响

合理的使用线程池能够带来三个好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁的消耗
  • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行
  • 提高线程的可管理性。

1. 自定义线程池-阻塞队列

自定义线程池.jpg
public class ThreadPool {
    public static void main(String[] args) {
        Pool pool = new Pool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
            //拒绝策略
            //queue.put(task); //死等
            //queue.offer(task,500,TimeUnit.MILLISECONDS); //带超时等待
            //System.out.println("放弃");
        });

        for (int i = 0; i < 3; i++) {
            int j = i;
            pool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " execute " + j);

            });
        }
    }
}

@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

class Pool {
    private BlockingQueue<Runnable> taskQueue;

    //线程集合
    private HashSet<Worker> workers = new HashSet<>();

    //线程数
    private int coreSize;

    //获取任务的超时时间
    private long timeout;

    private TimeUnit unit;

    private RejectPolicy<Runnable> rejectPolicy;

    public Pool(int coreSize, long timeout, TimeUnit unit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.unit = unit;
        taskQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    //执行的方法
    public void execute(Runnable task) {
        synchronized (workers) {
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                System.out.println(Thread.currentThread().getName() + " add workers" + task);
                workers.add(worker);
                worker.start();
            } else {
                //taskQueue.put(task);

                //策略模式
                //1 死等 2. 带超时等待 3. 放弃任务执行 4. 抛出异常 5. 让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable task) {

            this.task = task;
        }

        @Override
        public void run() {
            //while (task!= null || (task = taskQueue.take()) != null){
            while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 执行 " + task );
                    task.run();
                } catch (Exception e) {
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println(Thread.currentThread().getName() + " 移除");
                workers.remove(this);
            }
        }
    }

}

class BlockingQueue<T> {
    Deque<T> queue = new ArrayDeque<>();

    int capacity;

    Lock lock = new ReentrantLock();

    Condition emptyWaitSet = lock.newCondition();
    Condition fullWaitSet = lock.newCondition();

    public BlockingQueue(int queueCapacity) {
        this.capacity = queueCapacity;
    }

    //添加带超时的等待
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    //返回的是剩余的时间
                    if (nanos <= 0) {
                        System.out.println(Thread.currentThread().getName() + " 没等到,返回");
                        return null; //没等到
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T task = queue.removeFirst();
            System.out.println(Thread.currentThread().getName() + " 出队" + task);
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    //取任务
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 队列为空,等待");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T task = queue.removeFirst();
            System.out.println(Thread.currentThread().getName() + " 出队 " + task);
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    //添加任务
    public void put(T task) {

        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 队列已满,等待进入队列");
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            queue.addLast(task);
            System.out.println(Thread.currentThread().getName() + " 入队 " + task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    //带超时的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while (queue.size() == capacity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    System.out.println(Thread.currentThread().getName() + " 队列已满,等待进入队列");
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            System.out.println(Thread.currentThread().getName() + " 入队 " + task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }


    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            //判断队列是否已满
            if (queue.size() == capacity) {
                rejectPolicy.reject(this, task);
            } else {
                queue.addLast(task);
                emptyWaitSet.signal();
                System.out.println(Thread.currentThread().getName() + " 入队 " + task);
            }
        } finally {
            lock.unlock();
        }
    }
}

执行结果:
(放弃任务)


线程池.png

  • 自定义拒绝策略接口
@FunctionalInterface
interface RejectPolicy{
    void reject(BlockingQueue<T> queue,  T task);
}

2. ThreadPoolExecutor

2.1线程池状态

ThreadPoolExecutor 使用int 高三位来表示线程池状态,低29位表示线程池数量

状态名 高3位 接受新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接受新任务,会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为0即将进入终结
TERMINATED 011 - - 终结状态

这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次CAS原子操作进行赋值

//c为旧值,ctlOf返回的结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
//rs为高三位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

2.2 构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间--针对救急线程
  • unit 时间单位--针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂--可以为线程创建时起名字
  • handler 拒绝策略

最大线程数= 核心线程数+ 救急线程数

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务

  • 当线程数达到corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程

  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建maximunPoolSize-corePoolSize数目的线程来救急

  • 如果线程达到了maximunPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略jdk提供了四种实现,其实著名框架也提供了实现

    • AbortPolicy 让调用者抛出RejectExecutionException异常,这是默认策略
    • CallerRunsPolicy 让调用者运行任务
    • DiscardPolicy 放弃本次任务
    • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    • Dubbo 的实现,在抛出 RejectExecutionException 异常之前会记录日志,并dump线程栈信息,方便定位问题
    • Netty 的实现,是创建一个新线程来执行任务
    • ActiveMQ的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
    • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中的每种拒绝策略
  • 当高峰过去后,超过coreSize的救急线程如果一段时间没有任务做,需要结束资源,这个时间由keepAliveTime 和 unit来控制。

根据这个构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池

2.3 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

特点:

  • 核心线程数== 最大线程数(没有救急线程被创建),因此无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

评价:

  • 适合于任务量已知,相对耗时的任务

2.4 newCacheThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

特点:

  • 核心线程数是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着
    • 全部是救急线程(60s后可以回收)
    • 救急线程可以无限创建
  • 队列采用了SynchronousQueue实现的特点是,它没有容量,没有线程来取是放不进去的(一手交钱一手交货)

评价:

  • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程。
  • 适合任务数比较密集,但每个任务执行时间短的情况

2.5 newSingleThreadPool

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
}

使用场景:
希望多个任务排队执行。线程数固定为1,任务数多于1时,会放入无界队列排队。任务执行完成,这唯一的线程不会被释放

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
  • 线程个数始终为1,不能修改
    • FinalizableDelegateExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始为1,后面还可以修改
    • 对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改

2.6 提交任务

//执行任务
void execute(Runnable command);
//提交任务task, 用返回值Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
//提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
//提交tasks中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
//提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
//提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

2.7 关闭线程池

shutdown

/**
 * 线程池状态变为SHUTDOWN
 * 不会接收新任务
 * 但已提交任务会执行完
 * 此方法不会阻塞调用线程的执行
 */
void shutdown();
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改线程池状态
        advanceRunState(SHUTDOWN);
        //仅会打断空闲线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
    tryTerminate();
}

shutdownNow

/**
 * 线程池状态变为STOP
 * 不会接收新任务
 * 会将队列中的任务返回
 * 并用 interrupt 的方式中断正在执行的任务
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改线程池状态
        advanceRunState(STOP);
        //打断所有线程
        interruptWorkers();
        //获取队列中剩余任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

其他方法

//不在RUNNING 状态的线程池,此方法返回true
boolean isShutDown();

//线程池状态是否是TERMINATED
boolean isTerminated();

//调用shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED 后做一些事情,可以用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
public class ShutDownTest {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future1 = pool.submit(()->{
            System.out.println("running 1");
            Thread.sleep(1000);
            System.out.println("finish 1");
            return "1";

        });
        Future<String> future2 = pool.submit(()->{
            System.out.println("running 2");
            Thread.sleep(1000);
            System.out.println("finish 2");
            return "2";

        });

        Future<String> future3 = pool.submit(()->{
            System.out.println("running 3");
            Thread.sleep(1000);
            System.out.println("finish 3");
            return "3";

        });

        System.out.println("shutdown");

        pool.shutdown();

    }
}

执行结果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3


public class ShutDownTest {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future1 = pool.submit(()->{
            System.out.println("running 1");
            Thread.sleep(1000);
            System.out.println("finish 1");
            return "1";

        });
        Future<String> future2 = pool.submit(()->{
            System.out.println("running 2");
            Thread.sleep(1000);
            System.out.println("finish 2");
            return "2";

        });

        Future<String> future3 = pool.submit(()->{
            System.out.println("running 3");
            Thread.sleep(1000);
            System.out.println("finish 3");
            return "3";

        });

        System.out.println("shutdown");

        try {
            pool.awaitTermination(3, TimeUnit.SECONDS); //不用,因为不知道等多久合适
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("other...");

    }
}

执行结果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3
other...


public class ShutDownTest {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future1 = pool.submit(()->{
            System.out.println("running 1");
            Thread.sleep(1000);
            System.out.println("finish 1");
            return "1";

        });
        Future<String> future2 = pool.submit(()->{
            System.out.println("running 2");
            Thread.sleep(1000);
            System.out.println("finish 2");
            return "2";

        });

        Future<String> future3 = pool.submit(()->{
            System.out.println("running 3");
            Thread.sleep(1000);
            System.out.println("finish 3");
            return "3";

        });

        System.out.println("shutdownNow");

        List<Runnable> task = pool.shutdownNow();//返回队列中任务

    }
}

执行结果:
shutdownNow
running 1
running 2


2.8. 池大小

  • 过小会导致程序不能充分利用系统资源、容易导致饥饿
  • 过大会导致更多的线程上下文切换,占用更多的资源

2.8.1 CPU密集型

通常采用 CPU核数 + 1 能够实现最优的CPU利用率,+1 是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费

2.8.2 I/O密集型

CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行I/O操作时,远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高他的利用率

经验公式如下
线程数 = 核数 * CPU利用率 * 总时间(CPU计算时间 + 等待时间)/ CPU计算时间

例如:4核CPU计算时间是50%,其他等待时间是50%,期望CPU被100%利用,套用公式

4*100% *100% / 50% = 8

2.9 任务调度线程池

在任务调度线程池功能加入之前,可以使用java.util.Timer来实现定时功能,Timer 的优点在于简单易用,但由于所有的任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

public class TimerTest {

    public static void main(String[] args){
        Timer timer = new Timer();

        TimerTask timerTask1 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(" task1");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        TimerTask timerTask2 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(" task2");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //使用Timer添加两个任务,希望他们都在 1s 后执行
        //Timer 内只有一个线程来顺序执行队列中的任务

        timer.schedule(timerTask1, 1000);
        timer.schedule(timerTask2, 1000);
    }
}

2.9.1 使用任务调度线程池改进

public class TimerTest {

    public static void main(String[] args){
        
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        pool.schedule(()->{
            System.out.println(" task1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1, TimeUnit.SECONDS);

        pool.schedule(()->{
            System.out.println(" task2");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1, TimeUnit.SECONDS);

    }
}

3. Fork/Join

Fork/Join 是JDK 1.7 新加入的线程池实现,

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容