读《Java并发编程》小结

戳我的笔记链接地址
本文是对《Java并发编程》专栏的读后小结,跟大家分享。

目录

1、bug的源头-三个属性
2、Java内存模型
3、死锁的解决方案
死锁发生的条件
死锁的预防
4、等待-通知机制
wait的使用范式
wait和sleep的区别
5、线程的生命周期
通用的线程生命周期(五态模型)
Java中线程的生命周期
状态转换
6、创建合理的线程数量
CPU密集型应用
I/O密集型应用
7、Lock与synchronized的不同
用两个条件变量实现阻塞队列
异步转同步
8、用Semaphore实现一个限流器
9、读写锁 ReadWriteLock
读写锁升级问题
9、StampedLock 比读写锁更快的锁
10、CountDownLatch 和 CyclicBarrier 让多线程步调一致
11、Java并发容器
注意事项
12、原子类
ABA问题
原子类组成概览
13、Java中的线程池
// todo 线程的运行过程
ThreadPoolExecutor 线程池参数
拒绝策略
使用线程池的注意事项
14、Future 获取异步执行结果
如何获取异步任务执行结果
FutureTask工具类
14、CompletableFuture 异步编程
15、CompletionService 批量执行异步任务
16、Fork/Join 并行计算框架
模拟MapReduce统计单词数量

1、bug的源头-三个属性

可见性、有序性、原子性。
我们的 CPU、内存、I/O 设备都在不断迭代,不断朝着更快的方向努力。但是,在这个快速发展的过程中,有一个核心矛盾一直存在,就是这三者的速度差异。
cpu寄存器缓存导致的可见性问题。
线程切换带来原子性问题。
编译优化带来有序性问题。
其中,在 Java 领域一个经典的案例就是利用双重检查创建单例对象:

public class Singleton {
  static Singleton instance;
  static Singleton getInstance(){
    if (instance == null) {
      synchronized(Singleton.class) {
        if (instance == null)
          instance = new Singleton(); //在这一步如果编译优化
        }
    }
    return instance;
  }
}

第八行,如果发生编译优化:我们以为的 new 操作应该是:
分配一块内存 M;
在内存 M 上初始化 Singleton 对象;
然后 M 的地址赋值给 instance 变量。
但是实际上优化后的执行路径却是这样的:
分配一块内存 M;
将 M 的地址赋值给 instance 变量;
最后在内存 M 上初始化 Singleton 对象。
如果在第2步发生线程a到线程b的切换,线程b直接返回instance,这个时候调用没有初始化过的instance对象,会产生空指针异常。

解决办法:对instance对象加volatile语义申明。

2、Java内存模型

Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化的方法。具体来说,这些方法包括 volatile、synchronized 和 final 三个关键字,以及七项 Happens-Before 规则。

3、死锁的解决方案

使用细粒度锁可以提高并行度,是性能优化的一个重要手段。但是有时细粒度的锁容易导致死锁。死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。
死锁发生的条件
互斥,共享资源 X 和 Y 只能被一个线程占用;
占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。
死锁的预防
反过来分析,也就是说只要我们破坏其中一个,就可以成功避免死锁的发生。互斥就是锁的目的,所以无法预防。
破坏占有且等待,可以一次性申请所有资源。要么全部获取成功,要么全部获取失败。
破坏不可抢占,核心是要能够主动释放它占有的资源,这一点 synchronized 是做不到的。java.util.concurrent 这个包下面提供的 Lock 是可以轻松解决这个问题的。提供tryLock(long, TimeUnit) 方法,在一段时间后放弃获取锁。
破坏循环等待条件,破坏这个条件,需要对资源进行排序,然后按序申请资源。

4、等待-通知机制

用 synchronized 实现等待 - 通知机制在 Java 语言里,等待 - 通知机制可以有多种实现方式,比如 Java 语言内置的 synchronized 配合 wait()、notify()、notifyAll() 这三个方法就能轻松实现。

如何用 synchronized 实现互斥锁,你应该已经很熟悉了。在下面这个图里,左边有一个等待队列,同一时刻,只允许一个线程进入 synchronized 保护的临界区(这个临界区可以看作大夫的诊室),当有一个线程进入临界区后,其他线程就只能进入图中左边的等待队列里等待(相当于患者分诊等待)。这个等待队列和互斥锁是一对一的关系,每个互斥锁都有自己独立的等待队列。
wait()工作原理图

notify()工作原理图

上面我们一直强调 wait()、notify()、notifyAll() 方法操作的等待队列是互斥锁的等待队列,所以如果 synchronized 锁定的是 this,那么对应的一定是 this.wait()、this.notify()、this.notifyAll();如果 synchronized 锁定的是 target,那么对应的一定是 target.wait()、target.notify()、target.notifyAll() 。
而且 wait()、notify()、notifyAll() 这三个方法能够被调用的前提是已经获取了相应的互斥锁,所以我们会发现 wait()、notify()、notifyAll() 都是在 synchronized{}内部被调用的。如果在 synchronized{}外部调用,或者锁定的 this,而用 target.wait() 调用的话,JVM 会抛出一个运行时异常:java.lang.IllegalMonitorStateException。

// wait的使用范式
  while(条件不满足) {
    wait();
  }

除非经过深思熟虑,否则尽量使用 notifyAll(),只用notify()可能会导致有线程永远得不到执行。

wait和sleep的区别
wait与sleep区别在于: 1. wait会释放所有锁而sleep不会释放锁资源. 2. wait只能在同步方法和同步块中使用,而sleep任何地方都可以. 3. wait无需捕捉异常,而sleep需要. 两者相同点:都会让渡CPU执行时间,等待再次调度! wait()方法与sleep()方法的不同之处在于,wait()方法会释放对象的“锁标志”。当调用某一对象的wait()方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了notify()方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程可以获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的notifyAll()方法,会将对象等待池中的所有线程都移动到该对象的锁标志等待池。 sleep()方法需要指定等待的时间,它可以让当前正在执行的线程在指定的时间内暂停执行,进入阻塞状态,该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。但是sleep()方法不会释放“锁标志”,也就是说如果有synchronized同步块,其他线程仍然不能访问共享数据。

5、线程的生命周期

通用的线程生命周期(五态模型)

初始状态,指的是线程已经被创建,但是还不允许分配 CPU 执行。这个状态属于编程语言特有的,在操作系统层面,真正的线程还没有创建。
可运行状态,指的是线程可以分配 CPU 执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行。
当有空闲的 CPU 时,操作系统会将其分配给一个处于可运行状态的线程,被分配到 CPU 的线程的状态就转换成了运行状态。
运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权,休眠状态的线程永远没有机会获得 CPU 使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。

Java中线程的生命周期

这五种状态在不同编程语言里会有简化合并。
Java 语言里则把可运行状态和运行状态合并了(变成了运行状态),这两个状态在操作系统调度层面有用,而 JVM 层面不关心这两个状态,因为 JVM 把线程调度交给操作系统处理了。除了简化合并,这五种状态也有可能被细化,比如,Java 语言里就细化了休眠状态(Blocked、Waiting、Timed_Waiting).

Java 语言中线程共有六种状态,分别是:
NEW(初始化状态)
RUNNABLE(可运行 / 运行状态)
BLOCKED(阻塞状态)
WAITING(无时限等待)
TIMED_WAITING(有时限等待)
TERMINATED(终止状态)

状态转换

  1. RUNNABLE 与 BLOCKED 的状态转换
    只有一种场景会触发这种转换,就是线程等待 synchronized 的隐式锁。

  2. RUNNABLE 与 WAITING 的状态转换
    第一种场景,获得 synchronized 隐式锁的线程,调用无参数的 Object.wait() 方法。
    第二种场景,调用无参数的 Thread.join() 方法。其中的 join() 是一种线程同步方法,例如有一个线程对象 thread A,当调用 A.join() 的时候,执行这条语句的线程会等待 thread A 执行完,而等待中的这个线程,其状态会从 RUNNABLE 转换到 WAITING。当线程 thread A 执行完,原来等待它的线程又会从 WAITING 状态转换到 RUNNABLE。
    第三种场景,调用 LockSupport.park() 方法。其中的 LockSupport 对象,也许你有点陌生,其实 Java 并发包中的锁,都是基于它实现的。调用 LockSupport.park() 方法,当前线程会阻塞,线程的状态会从 RUNNABLE 转换到 WAITING。调用 LockSupport.unpark(Thread thread) 可唤醒目标线程,目标线程的状态又会从 WAITING 状态转换到 RUNNABLE。

  3. RUNNABLE 与 TIMED_WAITING 的状态转换
    有五种场景会触发这种转换:
    调用带超时参数的 Thread.sleep(long millis) 方法;
    获得 synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法;
    调用带超时参数的 Thread.join(long millis) 方法;
    调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法;
    调用带超时参数的 LockSupport.parkUntil(long deadline) 方法。
    这里你会发现 TIMED_WAITING 和 WAITING 状态的区别,仅仅是触发条件多了超时参数。

  4. 从 NEW 到 RUNNABLE 状态
    从 NEW 状态转换到 RUNNABLE 状态很简单,只要调用线程对象的 start() 方法就可以了

  5. 从 RUNNABLE 到 TERMINATED 状态
    线程执行完 run() 方法后,会自动转换到 TERMINATED 状态,当然如果执行 run() 方法的时候异常抛出,也会导致线程终止。有时候我们需要强制中断 run() 方法的执行,例如 run() 方法访问一个很慢的网络,我们等不下去了,想终止怎么办呢?Java 的 Thread 类里面倒是有个 stop() 方法,不过已经标记为 @Deprecated,所以不建议使用了。正确的姿势其实是调用 interrupt() 方法。

stop() 方法会真的杀死线程,不给线程喘息的机会,如果线程持有 ReentrantLock 锁,被 stop() 的线程并不会自动调用 ReentrantLock 的 unlock() 去释放锁,那其他线程就再也没机会获得 ReentrantLock 锁。
interrupt() 方法仅仅是通知线程,线程有机会执行一些后续操作,同时也可以无视这个通知。被 interrupt 的线程,是怎么收到通知的呢?一种是异常,另一种是主动检测。
当线程 A 处于 WAITING、TIMED_WAITING 状态时,如果其他线程调用线程 A 的 interrupt() 方法,会使线程 A 返回到 RUNNABLE 状态,同时线程 A 的代码会触发 InterruptedException 异常。上面我们提到转换到 WAITING、TIMED_WAITING 状态的触发条件,都是调用了类似 wait()、join()、sleep() 这样的方法,我们看这些方法的签名,发现都会 throws InterruptedException 这个异常。这个异常的触发条件就是:其他线程调用了该线程的 interrupt() 方法。

6、创建合理的线程数量

创建合理的线程数量,目的是将硬件的性能发挥到极致。根据不同的应用场景,我们将应用分为两种场景论述,I/O密集型应用和CPU密集型应用。
CPU密集型应用
对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
I/O密集型应用
对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:
最佳线程数 =1 +(I/O 耗时 / CPU 耗时)
不过上面这个公式是针对单核 CPU 的,至于多核 CPU,也很简单,只需要等比扩大就可以了,计算公式如下:
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]

Q: 有些同学对于最佳线程数的设置积累了一些经验值,认为对于 I/O 密集型应用,最佳线程数应该为:2 * CPU 的核数 + 1,你觉得这个经验值合理吗?
A: 工作中都是按照逻辑核数来的,理论值和经验值只是提供个指导,实际上还是要靠压测!!!

7、Lock与synchronized的不同

synchronized存在,Javasdk还造一个Lock的原因主要是为了弥补synchronized,会阻塞线程且不会释放已经占有的资源的问题,lock的三个方法如下:

// 支持响应中断
void lockInterruptibly() 
  throws InterruptedException;
// 支持超时
boolean tryLock(long time, TimeUnit unit) 
  throws InterruptedException;
// 支持非阻塞获取锁
boolean tryLock();

并且,Lock支持多个条件变量。Lock用来实现“互斥”,condition用来实现“同步”。

用两个条件变量实现阻塞队列

public class BlockedQueue<T>{
  final Lock lock =
    new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull =
    lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty =
    lock.newCondition();

  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满
        notFull.await();
      }  
      // 省略入队操作...
      //入队后,通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }  
      // 省略出队操作...
      //出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

不过,这里你需要注意,Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。如果一不小心在 Lock&Condition 实现的管程里调用了 wait()、notify()、notifyAll(),那程序可就彻底玩儿完了。

异步转同步
远程调用rpc请求时,面临异步转同步的问题,因为tcp层面上rpc请求就是异步的,它不会等待请求返回结果,所以类似于dubbo这种rpc框架也是做了异步转同步的工作,具体实现类似于上面“用两个条件变量实现阻塞队列”的代码。

8、用Semaphore实现一个限流器

极客专栏链接 https://time.geekbang.org/column/article/88499
Semaphore是信号量的意思,可以允许多个线程访问一个临界区。可以用来实现比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。

9、读写锁 ReadWriteLock

极客专栏链接(质量很高,也很实用) https://time.geekbang.org/column/article/88909
这个链接里文章实现了一个简易的完备缓存的示例。
读写锁升级问题
读锁不能升级为写锁。
写锁可以降级为读锁。

如果进行读写锁升级,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的,这个你一定要注意。

9、StampedLock 比读写锁更快的锁

极客专栏链接 使用stampedLock有几个比较重要需要注意的点,所以谨慎使用。
支持三种模式
写锁
悲观读锁(类似与读写锁的读锁)
乐观读(无锁,检测到有锁的时候需要转成悲观读锁)

10、CountDownLatch 和 CyclicBarrier 让多线程步调一致

极客时间专栏链接 https://time.geekbang.org/column/article/89461
CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下:
CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;
而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。
除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。

11、Java并发容器

通过对操作方法加synchronized关键字,可以使得一个容器变成线程安全的容器。这类容器称为同步容器,针对同步容器的性能问题,Java在1.5版本之后出来了并发容器,性能更高。
同步容器:
常见的有Vector、Stack 和 Hashtable等,通过对操作方法加synchronized关键字实现。
并发容器:
数量较多,主要有以下四大类:Set、Map、Set、Queue
并发容器关系图:
比较熟悉的有ConcurrentHashMap、BlockingQueue....

注意事项
1、在容器领域一个容易被忽视的“坑”是用迭代器遍历容器。

// 有问题的写法
List list = Collections.
  synchronizedList(new ArrayList());
Iterator i = list.iterator(); 
while (i.hasNext())
  foo(i.next());
  
// 正确的写法
// 因为是对list的操作,如果list变化了,会使得迭代器报错 
// 所以先把list锁住,保证迭代器运行期间list不会变化 
// 这也是在迭代器里对当前元素删除会报错的原因

List list = Collections.
  synchronizedList(new ArrayList());
Iterator i = list.iterator(); 
while (i.hasNext())
  foo(i.next());

2、另外,使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)。实际工作中,一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM。上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。

12、原子类

极客时间专栏 https://time.geekbang.org/column/article/90515
原子类高性能的秘密就是硬件支持,基于CPU提供的cas(compare and swap)指令。
ABA问题
aba问题,指的是一个共享变量经历了A--》B--》A的一个过程,虽然最终值一样,但是已经被更新过了。使用原子化的更新对象很可能就需要关心 ABA 问题,因为两个 A 虽然相等,但是第二个 A 的属性可能已经发生变化了。
相关实现有 AtomicReference、AtomicStampedReference 和 AtomicMarkableReference,利用它们可以实现对象引用的原子化更新。AtomicReference 提供的方法和原子化的基本数据类型差不多,这里不再赘述。不过需要注意的是,对象引用的更新需要重点关注 ABA 问题,AtomicStampedReference 和 AtomicMarkableReference 这两个原子类可以解决 ABA 问题。

原子类组成概览

13、Java中的线程池

美团技术文章Java线程池
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

使用线程池的目的是为了避免线程的频繁创建和销毁。线程是一个重量级的对象,应该避免频繁创建和销毁。
线程池是一种生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。在下面的示例代码中,我们创建了一个非常简单的线程池 MyThreadPool,你可以通过它来理解线程池的工作原理。

//简化的线程池,仅用来说明工作原理
class MyThreadPool{
  //利用阻塞队列实现生产者-消费者模式
  BlockingQueue<Runnable> workQueue;
  //保存内部工作线程
  List<WorkerThread> threads 
    = new ArrayList<>();
  // 构造方法
  MyThreadPool(int poolSize, 
    BlockingQueue<Runnable> workQueue){
    this.workQueue = workQueue;
    // 创建工作线程
    for(int idx=0; idx<poolSize; idx++){
      WorkerThread work = new WorkerThread();
      work.start();
      threads.add(work);
    }
  }
  // 提交任务
  void execute(Runnable command){
    workQueue.put(command);
  }
  // 工作线程负责消费任务,并执行任务
  class WorkerThread extends Thread{
    public void run() {
      //循环取任务并执行
      while(true){ ①
        Runnable task = workQueue.take();
        task.run();
      } 
    }
  }  
}

/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue = 
  new LinkedBlockingQueue<>(2);
// 创建线程池  
MyThreadPool pool = new MyThreadPool(
  10, workQueue);
// 提交任务  
pool.execute(()->{
    System.out.println("hello");
});

在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码①处的 while 循环。线程池主要的工作原理就这些,是不是还挺简单的?

// todo 线程的运行过程
什么情况下增加新线程、添加到任务队列等等

ThreadPoolExecutor 线程池参数

ThreadPoolExecutor(
  int corePoolSize, // 表示线程池保有的最小线程数。
  int maximumPoolSize, // 表示线程池创建的最大线程数。当项目很忙时,就需要加人,最多加到 maximumPoolSize 个人。
  long keepAliveTime, // 线程可以空闲的存活时间
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue, //工作队列,用来保存生产的资源,也是线程要消费的资源
  ThreadFactory threadFactory, // 通过这个参数自定义如何创建线程,如给线程指定一个名字
  RejectedExecutionHandler handler) // 拒绝策略
拒绝策略
ThreadPoolExecutor 已经提供了以下 4 种策略。
CallerRunsPolicy:提交任务的线程自己去执行该任务。
AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
DiscardPolicy:直接丢弃任务,没有任何异常抛出。
DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。

使用线程池的注意事项
1、尽量使用有界队列
Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors 了。
不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。

2、默认的拒绝策略要慎用
使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。

14、Future 获取异步执行结果
极客时间专栏 https://time.geekbang.org/column/article/91292
如何获取异步任务执行结果
Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。

// 提交Runnable任务
Future<?> 
  submit(Runnable task);
// 提交Callable任务
<T> Future<T> 
  submit(Callable<T> task);
// 提交Runnable任务及结果引用  
<T> Future<T> 
  submit(Runnable task, T result);

你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

// 取消任务
boolean cancel(
  boolean mayInterruptIfRunning);
// 判断任务是否已取消  
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);
FutureTask工具类
前面我们提到的 Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似,所以这里我就不再赘述了。
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);
那如何使用 FutureTask 呢?其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。
// 创建FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = 
  Executors.newCachedThreadPool();
// 提交FutureTask 
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。
// 创建FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();
// 以上两种方式还可以组合起来使用。

下面的示例代码就是用这一章提到的 Future 特性来实现的。首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。

// 创建任务T2的FutureTask
FutureTask<String> ft2
  = new FutureTask<>(new T2Task());
// 创建任务T1的FutureTask
FutureTask<String> ft1
  = new FutureTask<>(new T1Task(ft2));
  
// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1执行结果
System.out.println(ft1.get());

// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
  FutureTask<String> ft2;
  // T1任务需要T2任务的FutureTask
  T1Task(FutureTask<String> ft2){
    this.ft2 = ft2;
  }
  @Override
  String call() throws Exception {
    System.out.println("T1:洗水壶...");
    TimeUnit.SECONDS.sleep(1);
    
    System.out.println("T1:烧开水...");
    TimeUnit.SECONDS.sleep(15);
    // 获取T2线程的茶叶  
    String tf = ft2.get();
    System.out.println("T1:拿到茶叶:"+tf);

    System.out.println("T1:泡茶...");
    return "上茶:" + tf;
  }
}
// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
  @Override
  String call() throws Exception {
    System.out.println("T2:洗茶壶...");
    TimeUnit.SECONDS.sleep(1);

    System.out.println("T2:洗茶杯...");
    TimeUnit.SECONDS.sleep(2);

    System.out.println("T2:拿茶叶...");
    TimeUnit.SECONDS.sleep(1);
    return "龙井";
  }
}
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井

14、CompletableFuture 异步编程

极客时间专栏文章链接 https://time.geekbang.org/column/article/91569
CompletableFuture 实现了 CompletionStage 接口。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等,这些都在CompletionStage接口中得到了体现。很好用就是了。具体参考上述文章链接。

注意异常处理,默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

15、CompletionService 批量执行异步任务

当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

16、Fork/Join 并行计算框架

极客时间专栏文章链接 https://time.geekbang.org/column/article/92524

Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的 MapReduce,所以你可以把 Fork/Join 看作单机版的 MapReduce。

Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。

模拟MapReduce统计单词数量
学习 MapReduce 有一个入门程序,统计一个文件里面每个单词的数量,下面我们来看看如何用 Fork/Join 并行计算框架来实现。我们可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果,你可以对照前面的简版分治任务模型图来理解这个过程。

上述极客时间链接里存在具体的实现代码,可以仔细看看。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容