(1)多线程的使用和基础

1.线程的应用

如何应用多线程

在 Java 中,有多种方式来实现多线程。继承 Thread 类、实现 Runnable 接口、使用 ExecutorService、Callable、Future 实现带返回结果的多线程。

package com.sunkang.use;

import java.util.concurrent.*;

/**
 * @Project: ThreadExample
 * @description: 线程的创建方式
 * @author: sunkang
 * @create: 2020-06-21 21:38
 * @ModificationHistory who      when       What
 **/
public class ThreadDemo extends  Thread {


    //通过继承Thread,复写run方法
    @Override
    public void run() {
        System.out.println("1.通过继承Thread来创建线程 ");
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //方式一:通过继承Thread来创建线程
        ThreadDemo threadDemo = new ThreadDemo();
        threadDemo.start();

        //方式二:通过实现Runnable接口创建线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("2.通过实现Runnable接口创建线程 ");
            }
        }).start();


        //方式三:通过实现Callable接口来创建带有返回值的线程
        ExecutorService excutorService = Executors.newFixedThreadPool(1);

        Future<String>  future= excutorService.submit(new Callable<String>() {
            @Override
            public String  call() throws Exception {
                return "3.通过实现Callable接口来创建带有返回值的线程";
            }
        });
        System.out.println(future.get());
        excutorService.shutdown();


        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "3.通过实现Callable接口来创建带有返回值的线程";
            }
        });
        new Thread(futureTask).start();
        System.out.println( futureTask.get());
    }



}

2.多线程的实际应用

其实大家在工作中应该很少有场景能够应用多线程了,因为基于业务开发来说,很多使用异步的场景我们都通过分布式消息队列来做了。但并不是说多线程就不会被用到,你们如果有看一些框架的源码,会发现线程的使用无处不在
之前我应用得比较多的场景是在做文件跑批,每天会有一些比如收益文件、对账文件,我们会有一个定时任务去拿到数据然后通过线程去处理

一般引用多线程的场景基本是为了支持异步化操作,可以使用多线程,比如处理不同的任务,比如银行系统的财务在日终的时候要做一些日终的处理任务,这时候是可以应用到多线程

如果有看过zookeeper 源码的时候看到一个比较有意思的异步责任链模式,这里边我模仿一种责任链异步化处理的方式,

图中列举了三种处理器,
PreCheckProcessor :可认为是前置检查处理器,检查不通过,后置处理器不处理
LogProcessor:日志记录处理器
SavaProcessor:存储数据处理器

这里改善了链式处理,增加了队列的方式对请求进行了异步的处理,实现了更高的并发性

image.png

代码如下:

请求上下文
/**
 * @Project: ThreadExample
 * @description: 请求上下文,可以存储很多请求相关的上下文
 * @author: sunkang
 * @create: 2020-06-21 22:04
 * @ModificationHistory who      when       What
 **/
public class RequestContext {

    private String requestId;

    private Map<String,String> data;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public Map<String, String> getData() {
        return data;
    }

    public void setData(Map<String, String> data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "RequestContext{" +
                "requestId='" + requestId + '\'' +
                ", data=" + data +
                '}';
    }
}
处理器接口
/**
 * @Project: ThreadExample
 * @description:  处理器接口
 * @author: sunkang
 * @create: 2020-06-21 22:04
 * @ModificationHistory who      when       What
 **/
public abstract class IRequestProcessor extends Thread{

    void process(RequestContext context) {

    }
}
前置检查处理器
/**
 * @Project: ThreadExample
 * @description: 前置检查处理器
 * @author: sunkang
 * @create: 2020-06-21 22:07
 * @ModificationHistory who      when       What
 **/
public class PreCheckProcessor extends IRequestProcessor {

    private volatile boolean isFinish = false;

    private IRequestProcessor nextProcessor;

   private  LinkedBlockingQueue<RequestContext> requests = new LinkedBlockingQueue<>();

    public PreCheckProcessor(IRequestProcessor nextProcessor) {
        this.nextProcessor = nextProcessor;
    }

    public void shutdown(){
        isFinish =true;
    }

    @Override
    public void process(RequestContext context) {
        //TODO 做些前置检查

        //放入阻塞队列中
        requests.add(context);
    }

    @Override
    public void run() {
        while(!isFinish){ //不建议这么写
            try {
                RequestContext request=requests.take();//阻塞式获取数据
                //真正的处理逻辑
                System.out.println("prevProcessor:"+request);
                //交给下一个责任链
                if(nextProcessor != null){
                    nextProcessor.process(request);
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
打印日志信息处理器
/**
 * @Project: ThreadExample
 * @description: 主要是打印日志信息
 * @author: sunkang
 * @create: 2020-06-21 22:16
 * @ModificationHistory who      when       What
 **/
public class LogProcessor extends IRequestProcessor {


    private volatile boolean isFinish = false;

    private IRequestProcessor nextProcessor;

    private LinkedBlockingQueue<RequestContext> requests = new LinkedBlockingQueue<>();

    public LogProcessor(IRequestProcessor nextProcessor) {
        this.nextProcessor = nextProcessor;
    }

    public void shutdown(){
        isFinish =true;
    }

    @Override
    public void process(RequestContext context) {
        //TODO 按照实际业务来写

        //放入阻塞队列中
        requests.add(context);
    }

    @Override
    public void run() {
        while(!isFinish){ //不建议这么写
            try {
                RequestContext request=requests.take();//阻塞式获取数据
                //真正的处理逻辑
                System.out.println("LogProcessor:"+request);
                //交给下一个责任链
                //交给下一个责任链
                if(nextProcessor != null){
                    nextProcessor.process(request);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
存储处理器

这里的伪代码一直,就不在展示

启动入口
/**
 * @Project: ThreadExample
 * @description:程序入口
 * @author: sunkang
 * @create: 2020-06-21 22:19
 * @ModificationHistory who      when       What
 **/
public class App {
    
    public static void main(String[] args) {
        //初始化责任链,并启动线程
        IRequestProcessor logProcessor = new LogProcessor(null);
        logProcessor.start();

        IRequestProcessor preCheckProcessor = new PreCheckProcessor(logProcessor);
        preCheckProcessor.start();

        //构造请求
        RequestContext requestContext = new RequestContext();
        requestContext.setRequestId("request");

        //链式处理请求
        preCheckProcessor.process(requestContext);
    }
}

3.java并发编程的基础

基本应用搞清楚以后,我们再来基于Java线程的基础切入,来逐步去深入挖掘线程的整体模型。

线程的生命周期

Java 线程既然能够创建,那么也势必会被销毁,所以线程是存在生命周期的,那么我们接下来从线程的生命周期开始去了解线程。

线程一共有 6 种状态(NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED)

NEW:初始状态,线程被构建,但是还没有调用 start 方法
RUNNABLED:运行状态,JAVA 线程把操作系统中的就绪(READY)和运行(RUNNING)两种状态统一称为“运行中”
BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了 CPU 使用权,阻塞也分为几种情况

等待阻塞:运行的线程执行 wait 方法,jvm 会把当前线程放入到等待队列
同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占用了,那么 jvm 会把当前的线程放入到锁池中
其他阻塞:运行的线程执行 Thread.sleep 或者 t.join 方法,或者发出了 I/O 请求时,JVM 会把当前线程设置为阻塞状态,当 sleep 结束、join 线程终止、io 处理完毕则线程恢复

TIME_WAITING:超时等待状态,超时以后自动返回
TERMINATED:终止状态,表示当前线程执行完毕

image.png

通过代码的方式来演示线程的状态

代码示例

package com.sunkang.use;

import java.util.concurrent.TimeUnit;

/**
 * @Project: ThreadExample
 * @description: 线程的状态演示
 * @author: sunkang
 * @create: 2020-06-21 22:54
 * @ModificationHistory who      when       What
 **/
public class ThreadStatusDemo {

    public static void main(String[] args) {
        
        //Time_Waiting的状态演示
        new Thread(()->{
            while(true){
                try {
                    TimeUnit.SECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"Time_Waiting_Thread").start();

        //WAITINaG的状态演示
        new Thread(()->{
            while(true){
                synchronized (ThreadStatusDemo.class) {
                    try {
                        ThreadStatusDemo.class.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        },"Waiting_Thread").start();

        //RUNNING的状态演示
        new Thread(()->{
            for(;;);
        },"Running_Thread").start();

        //BLOCKED的状态演示
        new Thread(new BlockedDemo(),"Block01_Thread").start();
        new Thread(new BlockedDemo(),"Block02_Thread").start();
    }
    static class BlockedDemo implements   Runnable{
        @Override
        public void run() {
            synchronized (BlockedDemo.class){
                while(true){
                    try {
                        TimeUnit.SECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

启动一个线程前,最好为这个线程设置线程名称,因为这样在使用 jstack 分析程序或者进行问题排查时,就会给开发人员提供一些提示显示线程的状态

运行该示例,打开终端或者命令提示符,键入“jps”, (JDK1.5 提供的一个显示当前所有 java 进程 pid 的命
令)
根据上一步骤获得的 pid,继续输入 jstack pid(jstack是 java 虚拟机自带的一种堆栈跟踪工具。jstack 用于打印出给定的 java 进程 ID 或 core file 或远程调试服务的 Java 堆栈信息)
下面的jstack的日志输出的关键信息

"Block02_Thread" #16 prio=5 os_prio=0 tid=0x0000000018dcb000 nid=0x3fb8 waiting on condition [0x0000000019b3e000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at com.sunkang.use.ThreadStatusDemo$BlockedDemo.run(ThreadStatusDemo.java:58)
        - locked <0x00000000d62b91a0> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo$BlockedDemo)
        at java.lang.Thread.run(Thread.java:745)

"Block01_Thread" #15 prio=5 os_prio=0 tid=0x0000000018dca000 nid=0x3b6c waiting for monitor entry [0x0000000019a3f000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at com.sunkang.use.ThreadStatusDemo$BlockedDemo.run(ThreadStatusDemo.java:58)
        - waiting to lock <0x00000000d62b91a0> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo$BlockedDemo)
        at java.lang.Thread.run(Thread.java:745)

"Running_Thread" #14 prio=5 os_prio=0 tid=0x0000000018dc9800 nid=0x3470 runnable [0x000000001993f000]
   java.lang.Thread.State: RUNNABLE
        at com.sunkang.use.ThreadStatusDemo.lambda$main$3(ThreadStatusDemo.java:45)
        at com.sunkang.use.ThreadStatusDemo$$Lambda$4/1831932724.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)

"Waiting_Thread" #13 prio=5 os_prio=0 tid=0x0000000018dc8800 nid=0x3f98 in Object.wait() [0x000000001983f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000000d5f8db10> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo)
        at java.lang.Object.wait(Object.java:502)
        at com.sunkang.use.ThreadStatusDemo.lambda$main$2(ThreadStatusDemo.java:35)
        - locked <0x00000000d5f8db10> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo)
        at com.sunkang.use.ThreadStatusDemo$$Lambda$3/1078694789.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)

"Time_Waiting_Thread" #12 prio=5 os_prio=0 tid=0x0000000018dc1000 nid=0x31d0 waiting on condition [0x000000001973e000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at com.sunkang.use.ThreadStatusDemo.lambda$main$1(ThreadStatusDemo.java:23)
        at com.sunkang.use.ThreadStatusDemo$$Lambda$2/1096979270.run(Unknown Source)

通过上面的分析,我们了解到了线程的生命周期,现在在整个生命周期中并不是固定的处于某个状态,而是随着代码的执行在不同的状态之间进行切换

线程的启动

前面我们通过一些案例演示了线程的启动,也就是调用start()方法去启动一个线程,当 run 方法中的代码执行完毕以后,线程的生命周期也将终止。调用 start 方法的语义是
当前线程告诉 JVM,启动调用 start 方法的线程。

线程的启动原理

启动一个线程为什么是调用 start 方法,而不是 run 方法,这做一个简单
的分析,先简单看一下 start 方法的定义

 public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }

我们看到调用 start 方法实际上是调用一个 native 方法start0()来启动一个线程,首先 start0()这个方法是在Thread 的静态块中来注册的,代码如下

    private static native void registerNatives();
    static {
        registerNatives();
    }

registerNatives 的 本 地 方 法 的 定 义 在 文 件Thread.c,Thread.c 定义了各个操作系统平台要用的关于线程的公共数据和操作,以下是 Thread.c 的全部内容
http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c

<pre class="sourcelines stripes4 wrap" style="margin: 0px; font-size: 12px; position: relative; counter-reset: lineno 0; color: rgb(0, 0, 0); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-style: initial; text-decoration-color: initial;">static JNINativeMethod methods[] = {[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l42)
 {"start0",           "()V",        (void *)&JVM_StartThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l43)
 {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l44)
 {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l45)
 {"suspend0",         "()V",        (void *)&JVM_SuspendThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l46)
 {"resume0",          "()V",        (void *)&JVM_ResumeThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l47)
 {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l48)
 {"yield",            "()V",        (void *)&JVM_Yield},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l49)
 {"sleep",            "(J)V",       (void *)&JVM_Sleep},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l50)
 {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l51)
 {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l52)
 {"interrupt0",       "()V",        (void *)&JVM_Interrupt},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l53)
 {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l54)
 {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l55)
 {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l56)
 {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l57)
};</pre>

从 这 段 代 码 可 以 看 出 , start0() , 实 际 会 执 行JVM_StartThread 方法,这个方法是干嘛的呢? 从名字上来看,似乎是在 JVM 层面去启动一个线程,如果真的是这样,那么在 JVM 层面,一定会调用 Java 中定义的 run 方法。那接下来继续去找找答案。我们找到 jvm.cpp 这个文件;这个文件需要下载 hotspot 的源码才能找到.

大概调用流程如下:
先去jvm.cpp 执行 native_thread = new Thread(&thread_entry,sz);
然后thread.cpp 调用os::create_thread,实际就是调用平台创建线程的方法来创建线程

接下来就是线程的启动,会调用 Thread.cpp 文件中的Thread::start(Thread* thread)方法,start 方法中有一个函数调用: os::start_thread(thread);,调用平台启动线程的方法,最终会调用 Thread.cpp 文件中的 JavaThread::run()方法

线程的终止

如何终止一个线程呢? 这是面试过程中针对 3 年左右的人喜欢问到的一个题目。

线程的终止,并不是简单的调用 stop 命令去。虽然 api 仍然可以调用,但是和其他的线程控制方法如 suspend、resume 一样都是过期了的不建议使用,就拿 stop 来说,stop 方法在结束一个线程时并不会保证线程的资源正常释放,因此会导致程序可能出现一些不确定的状态。

要优雅的去中断一个线程,在线程中提供了一个 interrupt方法

interrupt 方法

当其他线程通过调用当前线程的 interrupt 方法,表示向当前线程打个招呼,告诉他可以中断线程的执行了,至于什么时候中断,取决于当前线程自己。
线程通过检查资深是否被中断来进行相应,可以通过isInterrupted()来判断是否被中断

这种通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断地将线程停止,因此这种终止线程的做法显得更加安全和优雅。

线程复位

第一种就是Thread.interrupted()可以设置对线程的中断标识的线程复位操作
还有一 种 被 动 复 位 的 场 景 , 就 是 对 抛 出InterruptedException 异 常 的 方 法 , 在InterruptedException 抛出之前,JVM 会先把线程的中断标识位清除,然后才会抛出 InterruptedException,这个时候如果调用 isInterrupted 方法,将会返回 false

为什么要复位

Thread.interrupted()是属于当前线程的,是当前线程对外界中断信号的一个响应,表示自己已经得到了中断信号,但不会立刻中断自己,具体什么时候中断由自己决定,让外界知道在自身中断前,他的中断状态仍然是 false,这就是复位的原因。

其实本质是会调用当前线程的isInterrupted,只不过传了ture的参数,表示需要清除中断标志

  public static boolean interrupted() {
        return currentThread().isInterrupted(true);
     }

下面是综合的一个例子,输出方式跟步骤的执行一样

/**
 * @Project: ThreadExample
 * @description: 线程中断标识的演示
 * @author: sunkang
 * @create: 2020-06-21 23:39
 * @ModificationHistory who      when       What
 **/
public class ThreadInterrupt {

    private static int i;

    public static void main(String[] args) throws InterruptedException {
        Thread thread=new Thread(()->{
            while(!Thread.currentThread().isInterrupted()){//默认是false  _interrupted state?

                System.out.println("步骤01:"+Thread.currentThread().isInterrupted());//此时中断表示为false
                try {
                    TimeUnit.SECONDS.sleep(10); //中断一个处于阻塞状态的线程。join/wait/queue.take..
                    System.out.println("不会输出");
                } catch (InterruptedException e) { //让其自己判断是否怎么处理,是抛异常还是继续处理
                    System.out.println("步骤02:"+Thread.currentThread().isInterrupted());//抛出异常会清除中断标识,此时中断标识表示状态为false
                    //再次中断
                    Thread.currentThread().interrupt();
                    System.out.println("步骤03:"+Thread.currentThread().isInterrupted());//再次中断状态为true
                    Thread.interrupted();//复位操作
                    System.out.println("步骤04:"+Thread.currentThread().isInterrupted());//复位操作,此时状态为false
                    //再次中断
                    Thread.currentThread().interrupt();
                    System.out.println("步骤05:"+Thread.currentThread().isInterrupted());//再次中断,此时状态为true,退出循环
                }
            }

            //对当前线程进行标志复位
        },"thread-01");
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        thread.interrupt(); //把isInterrupted设置成true
    }
}
线程的终止原理

thread.interrupt()方法做了什么事情

    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }

这个方法里会调用interrupt0(),是个native方法,可以在jvm.cpp文件找到jvm_Interrupt的定义

JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_Interrupt");

  // Ensure that the C++ Thread and OSThread structures aren't freed before we operate
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
  // We need to re-resolve the java_thread, since a GC might have happened during the
  // acquire of the lock
  JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
  if (thr != NULL) {
    Thread::interrupt(thr);//调用Thread的interrupt的方法
  }
JVM_END

这个方法会调用Thread::interrupt(thr),这个方法定义在Thread.cpp的文件中,代码如下

void Thread::interrupt(Thread* thread) {
  trace("interrupt", thread);
  debug_only(check_for_dangling_thread_pointer(thread);)
  os::interrupt(thread);
}

Thread::interrupt()会调用os:interrupt(),这个会调用平台的interrupt方法, jvm是跨平台的,所以对于不同的平台的操作系统,现场的调度方式不一样,以os_linux.cpp的文件为例,可以看出Thread::interrupt()做了两个事情,一个是设置中断标识为ture,通过内存屏障操作使得中断标识对于其他的线程可见,第二个使得调用该方法的线程的被唤醒

void os::interrupt(Thread* thread) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  OSThread* osthread = thread->osthread();//获得本地线程

  if (!osthread->interrupted()) {//判断本地线程是否中断
    osthread->set_interrupted(true);//设置中断标记为true
    // More than one thread can get here with the same value of osthread,
    // resulting in multiple notifications.  We do, however, want the store
    // to interrupted() to be visible to other threads before we execute unpark().
    OrderAccess::fence();//这里设置了内存屏障,使得interrupted状态对于其他线程立即可见
    ParkEvent * const slp = thread->_SleepEvent ;//_SleepEvent相当于Thread.sleep的方法,线程调用sleep方法,则通过uppark唤醒
    if (slp != NULL) slp->unpark() ;
  }

  // For JSR166. Unpark even if interrupt status already was set
  if (thread->is_Java_thread())
    ((JavaThread*)thread)->parker()->unpark();

  ParkEvent * ev = thread->_ParkEvent ;//_ParkEvent用于synchronized同步块和object.wait()方法,相当于也是通过unpark进行唤醒
  if (ev != NULL) ev->unpark() ;

}

set_interrupted(true)实际调用的是osThread.hpp中的set_interrupted()方法,在osThread()的方法中,定义了一个成员属性volatile jint _interrupted来实现的

  volatile jint _interrupted;     // Thread.isInterrupted state
  void set_interrupted(bool z)                      { _interrupted = z ? 1 : 0; }

thread.interrupt()方法实际就是设置一个 interrupted 状态标识为 true、并且通过ParkEvent 的 unpark 方法来唤醒线程。

  1. 对于 synchronized 阻塞的线程,被唤醒以后会继续尝试获取锁,如果失败仍然可能被 park
  2. 在调用 ParkEvent 的 park 方法之前,会先判断线程的中断状态,如果为 true,会清除当前线程的中断标识
  3. Object.wait 、 Thread.sleep 、 Thread.join 会 抛 出InterruptedException

这里给大家普及一个知识点,为什么 Object.wait、Thread.sleep 和 Thread.join 都 会 抛 出InterruptedException?

你会发现这几个方法有一个共同点,都是属于阻塞的方法而阻塞方法的释放会取决于一些外部的事件,但是阻塞方法可能因为等不到外部的触发事件而导致无法终止,
所以它允许一个线程请求自己来停止它正在做的事情。当一个方法抛出InterruptedException 时,它是在告诉调用者如果执行该方法的线程被中断,它会尝试停止正在做的事情并且通过抛出 InterruptedException 表示提前返回。

所以,这个异常的意思是表示一个阻塞被其他线程中断了。然 后 , 由 于 线 程 调 用 了 interrupt() 中 断 方 法 , 那 么Object.wait、Thread.sleep 等被阻塞的线程被唤醒以后会
通过 is_interrupted 方法判断中断标识的状态变化,如果发现中断标识为 true,则先清除中断标识,然后抛出InterruptedException

需要注意的是,InterruptedException 异常的抛出并不意味着线程必须终止,而是提醒当前线程有中断的操作发生,至于接下来怎么处理取决于线程本身,比如

  1. 直接捕获异常不做任何处理
  2. 将异常往外抛出
  3. 停止当前线程,并打印异常信息

为 了 让 大 家 能 够 更 好 的 理 解 上 面 这 段 话 , 我 们 以Thread.sleep 为例直接从 jdk 的源码中找到中断标识的清除以及异常抛出的方法代码找 到 is_interrupted() 方法, linux 平 台 中 的 实 现 在os_linux.cpp 文件中,代码如下

bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  OSThread* osthread = thread->osthread();

  bool interrupted = osthread->interrupted();//获取线程的中断标识

  if (interrupted && clear_interrupted) {//如果中断标识为true,并且需要清除标记为true
    osthread->set_interrupted(false);//设置中断标识为false
    // consider thread->_SleepEvent->reset() ... optional optimization
  }

  return interrupted;
}

Thread.sleep 这个操作在jdk的源码怎么实现,代码在jvm.cpp文件中,大概可以理解到先会调用Thread.is_interrupted的方法,这个会先判断线程的中断标记,如果设置的清除线程的标记是true,则会设置线程标记为false, 也就是先线程复位,然后在抛出InterruptedException异常

JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
  JVMWrapper("JVM_Sleep");

  if (millis < 0) {
    THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
  }

  if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {//先判断is_interrupted的状态,这个是调用is_interrupted的方法,并这里是需要清除线程中断标记的
    THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); //抛异常
  }

  // Save current thread state and restore it at the end of this block.
  // And set new thread state to SLEEPING.
  JavaThreadSleepState jtss(thread);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352