这段时间对业务系统做了个性能测试,其中使用了较多线程池的技术,故此做一个技术总结。
这次总结的内容比较多,主要是四个:
ExecutorService
CompletionService
Runnable
Callable
前两个是线程池相关接口,后两个是多线程相关接口。在最后,我会说明什么情况下使用哪个接口,这两类接口如何搭配使用。
Tips:个人拙见,如有不对,请多多指正。
一、ExecutorService
ExecutorService是一个接口,继承自Executor。ExecutorService提供了一些常用操作和方法,但是ExecutorService是一个接口,无法实例化。
不过,Java提供了一个帮助类Executors,可以快速获取一个ExecutorService对象,并使用ExecutorService接口的一些方法。
Executors帮助类提供了多个构造线程池的方法,常用的分为两类:
直接执行的
newCachedThreadPool
newFixedThreadPool
newSingleThreadExecutor
延迟或定时执行的
newScheduledThreadPool
newSingleThreadScheduledExecutor
Executors为每种方法提供了一个线程工厂重载。
(一)newCachedThreadPool
创建一个默认的线程池对象,里面的线程和重用,且在第一次使用的时候才创建。可以理解为线程优先模式,来一个创一个线程,直到线程处理完成后,再处理其他的任务。
Code:
package com.macro.boot.javaBuiltThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class MyExecutorService {
public static void main(String[] args) {
// 1. 使用帮助类
// ExecutorService executorService = Executors.newCachedThreadPool();
// 2. 提交任务
/* for (int i = 0; i < 20; i++) {
executorService.submit(new MyRunnable(i));
}*/
// 3. 重载方法测试
test();
}
private static void test() {
// 1. 使用帮助类
ExecutorService executorService = Executors.newCachedThreadPool(
new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程正在执行 --->" + n++);
}
}
);
// 2. 提交任务
for (int i = 0; i < 20; i++) {
executorService.submit(new MyRunnable(i));
}
}
}
/**
* 1. 线程类
*/
class MyRunnable implements Runnable {
private int id;
public MyRunnable(int id) {
this.id = id;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + "正在执行..." + "--->" + id);
}
}
输出:几乎是一下子就执行了,newCachedThreadPool会创建和任务数同等匹配的线程,直到处理完成任务的线程可以处理新增的任务。
(二)newFixedThreadPool
Code:创建一个可重用固定线程数量的线程池
package com.macro.boot.javaBuiltThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* 创建一个可固定重用次数的线程池
*/
public class MyNewFixedThreadPool {
public static void main(String[] args) {
/* // nThreads:线程数量
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
es.submit(new MyRunnable(i));
}*/
test();
}
private static void test() {
ExecutorService es = Executors.newFixedThreadPool(5, new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程" + n++);
}
});
// 提交任务
for (int i = 0; i < 10; i++) {
es.submit(new MyRunnable(i));
}
}
}
(三)newSingleThreadExecutor
只有一个线程(线程安全)
package com.macro.boot.javaBuiltThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class MyNewSingleThreadExecutor {
public static void main(String[] args) throws InterruptedException {
/* ExecutorService es = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
es.submit(new MyRunnable(i));
}*/
test();
}
private static void test() throws InterruptedException {
ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程" + n++);
}
});
for (int i = 0; i < 10; i++) {
Thread.sleep(100);
es.submit(new MyRunnable(i));
}
}
}
(四)newScheduledThreadPool
怎么理解这个线程池的延迟时间?很简单,第一次执行的开始时间,加上延迟的时间,就是第二次执行的时间。
package com.macro.boot.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MyScheduledExecutor {
public static void main(String[] args) {
ScheduledExecutorService sec = Executors.newScheduledThreadPool(4);
for (int i = 0; i < 10; i++) {
sec.schedule(new MyRunnable(i), 1, TimeUnit.SECONDS);
}
System.out.println("开始执行。。。");
sec.shutdown();
}
}
class MyRunnable implements Runnable {
private int id;
@Override
public String toString() {
return "MyRunnable{" +
"id=" + id +
'}';
}
public MyRunnable(int id) {
this.id = id;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + "执行了任务" + id);
}
}
(五)newSingleThreadScheduledExecutor
newSingleThreadScheduledExecutor和newScheduledThreadPool的区别是,newSingleThreadScheduledExecutor的第二次执行时间,等于第一次开始执行的时间,加上执行线程所耗费的时间,再加上延迟时间,即等于第二次执行的时间。
二、CompletionService
CompletionService是一个接口。
当我们使用ExecutorService启动多个Callable时,每个Callable返回一个Future,而当我们执行Future的get方法获取结果时,会阻塞线程直到获取结果。
而CompletionService正是为了解决这个问题,它是Java8的新增接口,它的实现类是ExecutorCompletionService。CompletionService会根据线程池中Task的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。
Code:
package com.macro.boot.completions;
import java.util.concurrent.*;
public class CompletionBoot {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 实例化线程池
ExecutorService es = Executors.newCachedThreadPool();
ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(es);
for (int i = 0, j = 3; i < 20; i++) {
ecs.submit(new CallableExample(i, j));
}
for (int i = 0; i < 20; i++) {
// take:阻塞方法,从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会阻塞,直到有任务完成返回结果。
Integer integer = ecs.take().get();
// 从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回null,该方法不会阻塞。
// Integer integer = ecs.poll().get();
System.out.println(integer);
}
// 不要忘记关闭线程池
es.shutdown();
}
}
class CallableExample implements Callable<Integer> {
/**
* 使用构造方法获取变量
* */
private int a;
private int b;
public CallableExample(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public Integer call() throws Exception {
return a + b;
}
@Override
public String toString() {
return "CallableExample{" +
"a=" + a +
", b=" + b +
'}';
}
}
三、Runnable
Runnable和Callable两者都是接口,但是也有区别:
实现Callable接口的任务线程能返回执行结果;而实现Runnable接口的任务线程不能返回结果;(重点)
Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;
Code:
class MyRunnable02 implements Runnable {
private int i;
public MyRunnable02(int i) {
this.i = i;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + "执行了... ---> " + i);
}
@Override
public String toString() {
return "MyRunnable{" +
"i=" + i +
'}';
}
}
四、Callable
Code:
class CallableExample implements Callable<Integer> {
/**
* 使用构造方法获取变量
* */
private int a;
private int b;
public CallableExample(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public Integer call() throws Exception {
return a + b;
}
@Override
public String toString() {
return "CallableExample{" +
"a=" + a +
", b=" + b +
'}';
}
}
五、Example
本次Demo:使用线程池,循环查询数据库500次。
在最开始的时候,是使用ExecutorServer + Future.get(因为查询数据库肯定需要获取结果,所以必须要用Callable,并且get到结果集)。但是get的阻塞操作,实在是太影响速度了,虽然考虑了两种手段去解决,但是都不了了之。
Code:(只贴线程池的代码,线程类和获取连接的类就不放了)
private void executorServerStart() throws SQLException, ClassNotFoundException, ExecutionException, InterruptedException {
// get con
TDConUtils tdConUtils = new TDConUtils();
Connection con = tdConUtils.getCon();
Statement statement = con.createStatement();
// SQL
String sql = "select last_row(value_double) from db1.tb1;";
// ThreadPool
ExecutorService es = Executors.newCachedThreadPool();
// for each
int count = 500;
for (int i = 0; i < count; i++) {
Future<ResultSet> submit = es.submit(new MyThread(i, con, sql));
ResultSet resultSet = submit.get();
// print
while (resultSet.next()) {
System.out.printf("输出:时间:%s,值:%f \n", resultSet.getTimestamp(1)
, resultSet.getDouble(2));
}
}
es.shutdown();
// close resources
tdConUtils.close(con, statement);
}
运行时间:8000ms +
改CompletionService:
Code:
private void completionServerStart() throws SQLException, ClassNotFoundException, InterruptedException, ExecutionException {
// get con
TDConUtils tdConUtils = new TDConUtils();
Connection con = tdConUtils.getCon();
Statement statement = con.createStatement();
// SQL
String sql = "select last_row(value_double) from db1.tb1;";
// ThreadPool
ExecutorService es = Executors.newCachedThreadPool();
//构建ExecutorCompletionService,与线程池关联
ExecutorCompletionService<ResultSet> ecs = new ExecutorCompletionService<ResultSet>(es);
// for each
int count = 500;
for (int i = 0; i < count; i++) {
ecs.submit(new MyThread(i, con, sql));
}
for (int i = 0; i < count; i++) {
// 通过take获取Future结果,此方法会阻塞
ResultSet resultSet = ecs.take().get();
while (resultSet.next()) {
System.out.printf("输出:时间:%s,值:%f \n", resultSet.getTimestamp(1)
, resultSet.getDouble(2));
}
}
es.shutdown();
tdConUtils.close(con, statement);
}
运行时间:300+ms
六、使用小结
分情况。
如果需要获取结果:线程使用Callable;
如果需要异步获取结果:线程池使用CompletionService。
如果不需要获取结果:线程使用Runnable;
如果需要阻塞获取结果:线程池使用ExecutorService。