Executor框架

前言

最近在看并发编程艺术这本书,对看书的一些笔记及个人工作中的总结。

hashiqiqi.jpeg

Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

看一个最简单的demo:

public class ExecutorTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0;i<100;i++){
            executorService.submit(() -> System.out.println("run start"));
        }
    }
}

Executor框架的结构

Executor框架主要由3大部分组成如下:
任务 :包括被执行任务需要实现的接口:Runnable接口或Callable接口。
任务的执行 :包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
异步计算的结果 :包括接口Future和实现Future接口的FutureTask类。

图片.png

Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。

Executor框架的成员

ThreadPoolExecutor

ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

FixedThreadPool:该方法返回一个固定数量的线程池,该方法的线程数始终不变,当一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个队列
中等待有空闲的线程去执行。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
}

原理说明:
第一个参数是corePoolSize,第二个参数是maximumPoolSize,第三个参数是多余的空闲线程等待新任务的最大时间,第四个即时间单位,第五个是多余任务加入的队列,这里使用的是无界队列(LinkedBlockingQueue)
1)当运行的线程小于corePoolSize,则创建新线程来执行任务。
2)当运行线程达到corePoolSize,则加入到阻塞队列中,当第一步执行完任务后会从队列中反复取出任务来执行。
上篇博客自定义线程池 //www.greatytc.com/p/60fbe6d6ca5c
我们知道如果构造函数的队列是无界队列,所以多余的任务会加入到队列中,使得maximumPoolSize参数没有作用,拒绝策略也不起作用。

SingleThreadExecutor:创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

构造函数与FixedThreadPool差不多,差别在于corePoolSize和maximumPoolSize都为1,原理类似,这边不过多解释了。

CachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不创建线程,并且每一个空闲线程会在
60s后自动回收。

底层使用的是SynchronousQueue,之前的博客讲过SynchronousQueue是不存储元素的阻塞队列

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
 }

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor通常使用工厂类Executors来创建

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
      return new DelegatedScheduledExecutorService
          (new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledJob {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        //在指定的时间后执行任务
        //scheduledExecutorService.schedule(() -> System.out.println("定时任务"),1, TimeUnit.SECONDS);
        //在1s后每隔1s执行一次定时任务
        scheduledExecutorService.scheduleWithFixedDelay(() -> System.out.println("定时任务"),2,1,TimeUnit.SECONDS);
    }
}

源码分析:

   /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
//从这边我们就知道底层是维护的DelayedWorkQueue(带有延迟时间的queue)
 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

FutureTask,Runnable,Callable

Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。

FutureTask类继承Future,也继承Runnable继承。

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。

当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。

当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用FutureTask。假设有多个线程执行若干任务,每个任务最多只能被执行一次。当多个线程试图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行。

看一个demo吧:

//调用get方法,主线程阻塞,等task线程执行完毕后才能往下之后

public class FutureTaskDemo {
    public static void main(String[] args) {
        try {
            fun();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

public static void fun() throws Exception{
        ExecutorService executor = Executors.newSingleThreadExecutor();
        FutureTask<String> task = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                long b = new Date().getTime();
                System.out.println("call begin " + b);
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < 100000; i++) {
                    sb.append(i).append(",");
                }
                System.out.println("call end " + (new Date().getTime() - b));
                //System.out.println(sb.toString());
                return sb.toString();
            }
 });
        executor.execute(task);
        long begin = new Date().getTime();
        System.out.println("begin:" + begin);//用来计算执行时间的                                                                            try {
        task.get();
        System.out.println("end:" + (new Date().getTime() - begin));
        executor.shutdown();
    }
}

Runnable:不返回结果
Callable:返回结果,但是只能使用ExecutorService来调用。
FutureTask:实现了Runnable和Future,所以兼顾两者优点,既可以使用ExecutorService,也可以使用Thread

再看一个demo:

public class FutureTaskTest {

    public static void main(String[] args) throws Exception{
        FutureTask<String> task = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                String str ="aaaa";
                System.out.println(str);
                return str;
            }
        });

        new Thread(task).start();
        //task.get();
        task.cancel(true);
        System.out.println("bbbb");
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容