任务的取消
1.轮询volatile变量
任务取消最简单的方式就是每次循环入口都去轮询volatile变量,但问题也显而易见
public class Producer implements Runnable{
private volatile boolean cancelled;
private final List<Product> productsQueue
=new LinkedBlockingQueue<>(20);
public void run(){
Product p =new Product();
while(!cancelled)
p=produce();
productsQueue.put(p);//当队列空间不足,put可能会阻塞,那么就无法去轮询cancelled变量,造成的结果就是可能长时间内无法响应取消
}
}
2.轮询线程的中断状态
这里先介绍下我对Thread.interrupt()的理解,所谓interrupt,并非是强制性的打断,而是提出一个中断建议,建议你这个线程要停下来,至于你这个线程到底是听我的停下来了,还是直接无视我,亦或是保留我这个意见,都由你自己决定,和我不相干。这三种情况,也对应了三种代码实现,抛出InterruptedException提前返回(听我的停下来了),catch了这个异常但是毫无处理(直接无视我),catch后返回上层调用函数时再次调用interrupt()恢复中断状态,交给上层处理(保留意见)。下面见其代码核心部分
public void interrupt() {
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
简而言之,interrupt方法只是去设置一个flag,不同的方法对这个flag的反应也不同,我再贴上jdk对于“不同方法不同反应”的解释
不完全概括
if the thread has started, and was not in a sleep() or wait() method, then calling interrupt() on it does not throw an exception. Instead it sets the thread's interrupted flag.
java这种中断机制优于抢占式中断是好疑问的,抢占式的中断可能会造成数据的不一致,而java的中断机制则更优雅,但是处理起来也需要更加细心。
言归正传,用检测线程中断的方式代替上述的volatile的变量后,run核心代码变化如下
public void run(){
try{
Product p=null;
while(!Thread.currentThread.isInterrupted()){ //循环入口检测一次
p=produce();
productsQueue.put(p);//这里会检测第二次
}
}catch(InterruptedException e){
/*自己决定如何处理*/
}
}
因为put方法能检测到线程中断,所以就避免了上面用volatile变量时问题。
关于为什么put函数能检测线程中断,我谈一下我的理解,我贴一段LinkedBlockingQueue.put()的代码(一些其他代码被我省去了,有心的读者可以自己去jdk里面再细看)
public void put(E e) throws InterruptedException {
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await(); //当队列已满时,生产者会调用Condition.await()阻塞自己
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
}
3.通过Future取消
Future有个cancel方法
boolean cancel(boolean mayInterruptIfRunning);
对于这个参数,官方解释如下
@param mayInterruptIfRunning true if the thread executing this
task should be interrupted; otherwise, in-progress tasks are allowed
to complete
在FutureTask中非常直观的展现这个参数的意义
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null){
r.interrupt();
}
}
Future一般都是由ExecutorService的submit方法返回的(大家看源代码就知道,就是把callable封装了返回),示例代码如下
public void taskRun(Runnable task){
Future f=executorService.submit(task);
try{
f.get(10,Timeunit.SECONDS);
}catch(Exception e1){
}finally{
f.cancel(true);
}
}
这里补充一下submit和execute的区别,submit比execute的功能更丰富些,可以返回一个future,并且他们对于异常的处理是不同的。
There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with
execute
this exception will go to the uncaught exception handler (when you don't have provided one explicitly, the default one will just print the stack trace toSystem.err
). If you submitted the task withsubmit
any thrown exception, checked or not, is then part of the task's return status. For a task that was submitted withsubmit
and that terminates with an exception, theFuture.get
will rethrow this exception, wrapped in anExecutionException
关于ExecutorService对于异常的处理,可以看下这里的讨论,下篇博客会好好分析异常。
处理不可中断的阻塞
大部分阻塞方法都会抛出InterruptedException来响应中断请求,然而也有一些阻塞方法并不响应中断,对于这类方法,调用interrupt方法除了设置flag以外毫无作用(并不会提前返回),对于这类方法,Doug Lea总结了一下(文章后面会有重写newTaskFor方法来处理这一类问题)
In the case of socket I/O, if a thread closes the socket, blocking I/O operations on that socket in other threads will complete early with a SocketException. The nonblocking I/O classes in java.nio also do not support interruptible I/O, but blocking operations can similarly be canceled by closing the channel or requesting a wakeup on the Selector. Similarly, attempting to acquire an intrinsic lock (enter a synchronized block) cannot be interrupted, but ReentrantLock supports an interruptible acquisition mode.
4.重写AbstractExecutorService中newTaskFor实现取消
这一点也是Doug Lea书上7.1.7上所述的,一开始我也没太看懂,后来看了下jdk中对于AbstractExecutorService的文档,对这点也算是稍微理解了
Provides default implementations of ExecutorService execution methods. This class implements the submit, invokeAny and invokeAll methods using a RunnableFuture returned by newTaskFor, which defaults to the FutureTask class provided in this package. For example, the implementation of submit(Runnable) creates an associated RunnableFuture that is executed and returned. Subclasses may override the newTaskFor methods to return RunnableFuture implementations other than FutureTask.
简单的解释下这段文字的意思,AbstractExecutorService实现了几个 ExecutorService的方法,比如submit,invokeAny,invokeAll,这些方法都是将传进来的callable通过newTaskFor方法转化为RunnableFuture(就是一个可runnable也可作为future的类,FutureTask就是实现了这样的接口),submit(Runnable)方法就是生成了对应的可执行可返回的RunnableFuture,子类可以重写newTaskFor方法以返回除了FutureTask外其他的RunnableFuture了。
先贴上jdk里面给的示例代码
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
//自定义一个RunnableFuture
static class CustomTask<V> implements RunnableFuture<V> {...}
//重写newTaskFor方法
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
return new CustomTask<V>(c);
}
protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
return new CustomTask<V>(r, v);
}
// ... add constructors, etc.
}
这样后,再看下书上给的例子,可能会更好理解
public interface CancellableTask<T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
public class CancellingExecutor extends ThreadPoolExecutor {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable){
//判断传进来的callable是否是我们要特殊处理的task
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);
}
}
public abstract class SocketUsingTask<T> implements CancellableTask<T> {
private Socket socket;
protected synchronized void setSocket(Socket s) {
socket = s;
}
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) {
}
}
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}
这例子实质上也非常粗糙,只是提供了一个思路,和jdk所给的例子一样。只是对于任务的取消多了一个选择,可重写newTaskFor方法实现自定义的取消。
线程的取消(停止)
1.Poison对象
线程的取消书上介绍了“Poison”方法即加一个毒药对象进入工作队列,当消费者碰到这个对象时就结束自己的工作,打个比方,就是工厂流水线里面放一个和产品不同的标记物,一旦流水线过来工人看到这个标记物了,那么就停止生产,但是这样做的缺点就是缺乏灵活性,只有生产者和消费者数量是已知的情况下,才能用毒药对象,不然控制起来难度太大。
2.关闭ExecutorService
因为Thread大多情况下是有线程池去管理的,所以关闭ExectuorService相比于前面提到的方法,更为普遍。
void shutdown(); //拒绝新任务,没完成的旧任务接着干
List<Runnable> shutdownNow(); //拒绝新任务,没完成的旧任务挨个取消,返回提交了但没有被实行的任务
shutdownNow的局限性也在于此,对于那些已经开始但尚未结束的任务,我们无法收集。如果有需求,我们可以在和上文中的newTaskFor一样,去覆盖AbstractExecutorService的execute方法。
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated())
throw new IllegalStateException(...);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
//注意finally代码块
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
// delegate other ExecutorService methods to exec
}
3.非正常线程的终止
所谓的非正常中止,大部分情况下都是由RuntimeException造成的,在Java中,为每个线程设置UncaughtExceptionHandler可以捕捉这些uncheckedException,在executorService中,你可以选择重写ThreadFactory的newThread方法,或者讲异常的捕捉全部放在Runnable&Callable中,也可以改写ThreadPoolExecutor.afterExcute方法,当然,这三种方法不是互相全等的,都有自己的使用场景,这里也要牵涉到execute和submit的差异,还有为什么会造成这种差异,都是需要深究的,因为篇幅受限(写太长看不下去啊),我准备在下一篇博客中,好好地分析下在ExecutorService中的异常处理机制。
4.JVM ShutdownHook
最后提及下所谓的JVM钩子,定义如下:
A shutdown hook is simply an initialized but unstarted thread. When the virtual machine begins its shutdown sequence it will start all registered shutdown hooks in some unspecified order and let them run concurrently.
简单的翻译下,ShutdownHook就是一个注册在jvm上当jvm退出时才会启动的线程,可用来做一些结尾扫荡什么的,若注册了多个钩子,那么他们会在jvm退出时一起运行,但顺序是随机的。
对于ShutdownHook,设计时需要满足以下几点:
- 所做之事一定要简短并尽快结束,因为该方法可能会被某些内部错误而中止(比如操作系统直接kill了,因此甚至极端情况下这个方法都不会被调用),而且不能因为这个方法导致jvm退出需要极长的时间。
- 所做之事一定要是线程安全的,并且要避免死锁。
给一段代码示例加深印象
public void start() {
//注册钩子
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try { LogService.this.stop(); }
catch (InterruptedException ignored) {}
}
});
}
5.Finalize
只说一点,看完这方法就别当这方法存在就行了。
本文参考资料:
http://ibruce.info/2013/12/19/how-to-stop-a-java-thread/
http://forward.com.au/javaProgramming/HowToStopAThread.html
https://www.ibm.com/developerworks/java/library/j-jtp05236/index.html
https://dzone.com/articles/know-jvm-series-2-shutdown
这篇博客主要是自己对并发编程实战中所提及的任务与线程的取消的一些个人思路梳理,写着写着发现异常处理也是大坑,准备下一篇开坑,争取两天内整理好,今天情人节,这篇文章前前后后写了五六个小时,因为怕误导也不想含糊其辞。总之收获很多!最后祝单身狗们,年年有今日,岁岁有今朝,哈哈!晚安,明天好好分析下Executor中的异常处理