ExecutorService|CompletionService的区别与选择

这段时间对业务系统做了个性能测试,其中使用了较多线程池的技术,故此做一个技术总结。

这次总结的内容比较多,主要是四个:

ExecutorService
CompletionService
Runnable
Callable
前两个是线程池相关接口,后两个是多线程相关接口。在最后,我会说明什么情况下使用哪个接口,这两类接口如何搭配使用。

Tips:个人拙见,如有不对,请多多指正。

一、ExecutorService
ExecutorService是一个接口,继承自Executor。ExecutorService提供了一些常用操作和方法,但是ExecutorService是一个接口,无法实例化。
不过,Java提供了一个帮助类Executors,可以快速获取一个ExecutorService对象,并使用ExecutorService接口的一些方法。

Executors帮助类提供了多个构造线程池的方法,常用的分为两类:

直接执行的
newCachedThreadPool
newFixedThreadPool
newSingleThreadExecutor
延迟或定时执行的
newScheduledThreadPool
newSingleThreadScheduledExecutor
Executors为每种方法提供了一个线程工厂重载。

(一)newCachedThreadPool
创建一个默认的线程池对象,里面的线程和重用,且在第一次使用的时候才创建。可以理解为线程优先模式,来一个创一个线程,直到线程处理完成后,再处理其他的任务。
Code:

package com.macro.boot.javaBuiltThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class MyExecutorService {
    public static void main(String[] args) {
        // 1. 使用帮助类
//        ExecutorService executorService = Executors.newCachedThreadPool();

        // 2. 提交任务
/*        for (int i = 0; i < 20; i++) {
            executorService.submit(new MyRunnable(i));
        }*/

        // 3. 重载方法测试
        test();
    }

    private static void test() {
        // 1. 使用帮助类
        ExecutorService executorService = Executors.newCachedThreadPool(
                new ThreadFactory() {
                    int n = 1;

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "线程正在执行 --->" + n++);
                    }
                }
        );

        // 2. 提交任务
        for (int i = 0; i < 20; i++) {
            executorService.submit(new MyRunnable(i));
        }
    }
}

/**
 * 1. 线程类
 */
class MyRunnable implements Runnable {
    private int id;

    public MyRunnable(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "正在执行..." + "--->" + id);
    }
}

输出:几乎是一下子就执行了,newCachedThreadPool会创建和任务数同等匹配的线程,直到处理完成任务的线程可以处理新增的任务。

(二)newFixedThreadPool
Code:创建一个可重用固定线程数量的线程池

package com.macro.boot.javaBuiltThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * 创建一个可固定重用次数的线程池
 */
public class MyNewFixedThreadPool {
    public static void main(String[] args) {
/*        // nThreads:线程数量
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            es.submit(new MyRunnable(i));
        }*/
        test();
    }

    private static void test() {
        ExecutorService es = Executors.newFixedThreadPool(5, new ThreadFactory() {
            int n = 1;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程" + n++);
            }
        });
        // 提交任务
        for (int i = 0; i < 10; i++) {
            es.submit(new MyRunnable(i));
        }
    }
}

(三)newSingleThreadExecutor
只有一个线程(线程安全)

package com.macro.boot.javaBuiltThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class MyNewSingleThreadExecutor {
    public static void main(String[] args) throws InterruptedException {
/*        ExecutorService es = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            es.submit(new MyRunnable(i));
        }*/
        test();
    }

    private static void test() throws InterruptedException {
        ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
            int n = 1;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程" + n++);
            }
        });
        for (int i = 0; i < 10; i++) {
            Thread.sleep(100);
            es.submit(new MyRunnable(i));
        }
    }
}

(四)newScheduledThreadPool
怎么理解这个线程池的延迟时间?很简单,第一次执行的开始时间,加上延迟的时间,就是第二次执行的时间。

package com.macro.boot.ScheduledExecutorService;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class MyScheduledExecutor {
    public static void main(String[] args) {
        ScheduledExecutorService sec = Executors.newScheduledThreadPool(4);
        for (int i = 0; i < 10; i++) {
            sec.schedule(new MyRunnable(i), 1, TimeUnit.SECONDS);
        }
        System.out.println("开始执行。。。");
        sec.shutdown();
    }
}

class MyRunnable implements Runnable {
    private int id;

    @Override
    public String toString() {
        return "MyRunnable{" +
                "id=" + id +
                '}';
    }

    public MyRunnable(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "执行了任务" + id);
    }
}

(五)newSingleThreadScheduledExecutor
newSingleThreadScheduledExecutor和newScheduledThreadPool的区别是,newSingleThreadScheduledExecutor的第二次执行时间,等于第一次开始执行的时间,加上执行线程所耗费的时间,再加上延迟时间,即等于第二次执行的时间。

二、CompletionService
CompletionService是一个接口。
当我们使用ExecutorService启动多个Callable时,每个Callable返回一个Future,而当我们执行Future的get方法获取结果时,会阻塞线程直到获取结果。
而CompletionService正是为了解决这个问题,它是Java8的新增接口,它的实现类是ExecutorCompletionService。CompletionService会根据线程池中Task的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。
Code:

package com.macro.boot.completions;

import java.util.concurrent.*;

public class CompletionBoot {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 实例化线程池
        ExecutorService es = Executors.newCachedThreadPool();
        ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(es);

        for (int i = 0, j = 3; i < 20; i++) {
            ecs.submit(new CallableExample(i, j));
        }
        for (int i = 0; i < 20; i++) {
            // take:阻塞方法,从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会阻塞,直到有任务完成返回结果。
            Integer integer = ecs.take().get();
            // 从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回null,该方法不会阻塞。
            // Integer integer = ecs.poll().get();
            System.out.println(integer);
        }
        // 不要忘记关闭线程池
        es.shutdown();
    }
}
class CallableExample implements Callable<Integer> {
    /**
     * 使用构造方法获取变量
     * */
    private int a;
    private int b;

    public CallableExample(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public Integer call() throws Exception {
        return a + b;
    }

    @Override
    public String toString() {
        return "CallableExample{" +
                "a=" + a +
                ", b=" + b +
                '}';
    }
}

三、Runnable
Runnable和Callable两者都是接口,但是也有区别:

实现Callable接口的任务线程能返回执行结果;而实现Runnable接口的任务线程不能返回结果;(重点)
Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;
Code:

class MyRunnable02 implements Runnable {
    private int i;

    public MyRunnable02(int i) {
        this.i = i;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "执行了... ---> " + i);
    }

    @Override
    public String toString() {
        return "MyRunnable{" +
                "i=" + i +
                '}';
    }
}

四、Callable
Code:

class CallableExample implements Callable<Integer> {
    /**
     * 使用构造方法获取变量
     * */
    private int a;
    private int b;

    public CallableExample(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public Integer call() throws Exception {
        return a + b;
    }

    @Override
    public String toString() {
        return "CallableExample{" +
                "a=" + a +
                ", b=" + b +
                '}';
    }
}

五、Example
本次Demo:使用线程池,循环查询数据库500次。
在最开始的时候,是使用ExecutorServer + Future.get(因为查询数据库肯定需要获取结果,所以必须要用Callable,并且get到结果集)。但是get的阻塞操作,实在是太影响速度了,虽然考虑了两种手段去解决,但是都不了了之。
Code:(只贴线程池的代码,线程类和获取连接的类就不放了)

private void executorServerStart() throws SQLException, ClassNotFoundException, ExecutionException, InterruptedException {
        // get con
        TDConUtils tdConUtils = new TDConUtils();
        Connection con = tdConUtils.getCon();
        Statement statement = con.createStatement();

        // SQL
        String sql = "select last_row(value_double) from db1.tb1;";

        // ThreadPool
        ExecutorService es = Executors.newCachedThreadPool();

        // for each
        int count = 500;
        for (int i = 0; i < count; i++) {
            Future<ResultSet> submit = es.submit(new MyThread(i, con, sql));
            ResultSet resultSet = submit.get();
            // print
            while (resultSet.next()) {
                System.out.printf("输出:时间:%s,值:%f \n", resultSet.getTimestamp(1)
                        , resultSet.getDouble(2));
            }
        }
        es.shutdown();

        // close resources
        tdConUtils.close(con, statement);
    }

运行时间:8000ms +
改CompletionService:
Code:

private void completionServerStart() throws SQLException, ClassNotFoundException, InterruptedException, ExecutionException {
        // get con
        TDConUtils tdConUtils = new TDConUtils();
        Connection con = tdConUtils.getCon();
        Statement statement = con.createStatement();

        // SQL
        String sql = "select last_row(value_double) from db1.tb1;";

        // ThreadPool
        ExecutorService es = Executors.newCachedThreadPool();

        //构建ExecutorCompletionService,与线程池关联
        ExecutorCompletionService<ResultSet> ecs = new ExecutorCompletionService<ResultSet>(es);
        // for each
        int count = 500;

        for (int i = 0; i < count; i++) {
            ecs.submit(new MyThread(i, con, sql));
        }
        for (int i = 0; i < count; i++) {
            // 通过take获取Future结果,此方法会阻塞
            ResultSet resultSet = ecs.take().get();
            while (resultSet.next()) {
                System.out.printf("输出:时间:%s,值:%f \n", resultSet.getTimestamp(1)
                        , resultSet.getDouble(2));
            }
        }

        es.shutdown();
        tdConUtils.close(con, statement);
    }

运行时间:300+ms

六、使用小结
分情况。
如果需要获取结果:线程使用Callable;
如果需要异步获取结果:线程池使用CompletionService。
如果不需要获取结果:线程使用Runnable;
如果需要阻塞获取结果:线程池使用ExecutorService。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容