5. Communication of Thread(线程通信)

5.1 线程的通信方式

有时候我们需要进行线程间通信,如简单的生产者消费者模式中,生产者生产完产品,需要通知消费者去消费产品,这就是一个最简单的线程通信的模型。想实现多个线程之间的协同,一个线程需要获取另一个线程的执行结果,线程执行的先后顺序这样的需求都需要用到线程通信。


线程通信图示

线程间通信的常用方式有以下几种:
1)文件共享
2)网络共享
3)共享变量
4)通过JDK提供的线程协调API:wait/notify、 park/unpark、suspend/resume(已废弃)
首先是通过文件共享的例子,一个线程往文件里面写消息,另一个线程从文件中读取内容,以文件作为介质来进行线程间通信。

5.2 线程的通信实战演练

5.2.1 文件为中介的线程通信

public class FileCommunicate {

    public static void main(String args[]) {
        createWriteThread();
        createReadThread();
    }

    /**
     * 创建写文件线程
     */
    public static void createWriteThread() {
        new Thread(() -> {
            int runTime = 10;
            while(runTime-- > 0){
                File file = new File("05-communication-of-thread/src/main/resources/fileCommunicate.log");
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
                String now = String.format("当前时间:%s", df.format(new Date()));
                try {
                    FileUtils.writeStringToFile(file, now, "UTF-8");
                    Thread.sleep(1000);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    /**
     * 创建读文件线程
     */
    public static void createReadThread() {
        new Thread(() -> {
            int runTime = 10;
            while(runTime-- > 0){
                File file = new File("01-high-performance-program/05-communication-of-thread/src/main/resources/fileCommunicate.log");
                String now = String.format("当前时间:%s", String.valueOf(System.currentTimeMillis()));
                try {
                    String content = null;
                    while((content = FileUtils.readFileToString(file)) != null){
                        System.out.println(content);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

}

5.2.2 变量为中介的线程通信

public class VariableCommunicate {

    private static String content = "";

    private static final int runTimes = 10;

    public static void main(String args[]){
        writeTread();
        readTread();
    }

    /**
     * 写线程
     */
    public static void writeTread(){
        new Thread(()->{
            try {
                for(int i =0; i<runTimes; i++){
                    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
                    content = String.format("当前时间:%s", df.format(new Date()));
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ).start();
    }
    /**
     * 读线程
     */
    public static void readTread(){
        new Thread(()->{
            try {
                for(int i =0; i<runTimes; i++){
                    Thread.sleep(1000);
                    System.out.println(content);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

5.2.3 JDK API 作为线程通信

5.2.3.1 suspend-resume(已废弃)

1)正常的Demo

public class SuspendDemo {

    private static BlockingDeque<Bread> queue = new LinkedBlockingDeque<Bread>();

    public static void main(String args[]) {
        try {
            Thread consumeTread = consumeTread();
            Thread.sleep(1000);
            producerTread(consumeTread);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 正常的生产者线程
     */
    public static void producerTread(Thread consumerThread) {
        System.out.println("启动生产者线程:");
        new Thread(() -> {
            Bread bread = new Bread("bread", 1.0f);
            System.out.println("\t生产面包:" + bread);
            queue.add(bread);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("恢复消费者线程:");
            consumerThread.resume();
        }).start();
    }

    /**
     * 正常的消费者线程
     */
    public static Thread consumeTread() {
        Thread consumer = new Thread(() -> {
            // take不到东西说明面包还没生产好
            Bread bread = null;
            System.out.println("消费者线程被挂起!");
            Thread.currentThread().suspend();
            if ((bread = queue.poll()) != null) {
                System.out.println("\t获取到了面包:" + bread);
            } else {
                System.out.println("\t依旧获取不到面包");
            }
        });
        System.out.println("启动消费者线程:");
        consumer.start();
        return consumer;
    }

    @Getter
    @Setter
    @AllArgsConstructor
    @ToString
    protected  static class Bread {
        private String name;
        private float price;
    }

}

运行结果:

运行结果

2)会产生死锁的Demo

public class SuspendDeadLockDemo {
    private static BlockingDeque<SuspendDemo.Bread> queue = new LinkedBlockingDeque<SuspendDemo.Bread>();

    public static void main(String args[]) throws Exception {
        Thread consumeTread = consumerThreadDeadLock();
        Thread.sleep(1000);
        producerThreadDeadLock(consumeTread);
    }

    /**
     * 死锁的suspend/resume。 suspend并不会像wait一样释放锁,故此容易写出死锁代码
     */
    public static Thread consumerThreadDeadLock() throws Exception {
        // 启动线程
        Thread consumerThread = new Thread(() -> {
            System.out.println("消费者线程被挂起!");
            // 当前线程拿到锁,然后挂起
            synchronized (queue) {
                Thread.currentThread().suspend();
            }
            SuspendDemo.Bread bread = null;
            if ((bread = queue.poll()) != null) {
                System.out.println("\t获取到了面包:" + bread);
            } else {
                System.out.println("\t依旧获取不到面包");
            }
        });
        consumerThread.start();
        return consumerThread;
    }

    public static void producerThreadDeadLock(Thread consumerThread) throws Exception {
        System.out.println("启动生产者线程:");
        new Thread(() -> {
            SuspendDemo.Bread bread = new SuspendDemo.Bread("bread", 1.0f);
            System.out.println("\t生产面包:" + bread);
            queue.add(bread);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("恢复消费者线程:");
            synchronized (queue) {
                consumerThread.resume();
            }
        }).start();
    }

}

运行结果:同样是可以看到程序一直挂在那里没有运行完

运行结果

suspend/resume会被废弃掉,就是因为有两种情况下会造成死锁:
1、像下面的代码那样,如果消费者线程挂起前已经持有了锁,然后消费者线程挂起,后面的生产者线程需要在获取到该锁的情况下才能让消费者线程resume,这样生产者线程就会一直获取不到这个锁(因为此时锁已经被消费者线程持有了,而且消费者线程被挂起且它不会释放掉当前被它占用着的锁,叫醒它的条件就是别的线程要拿到这把锁)
造成死锁的图示

2、造成死锁的第二种情况就是,假设生产者先把消费者线程先唤醒了,然后消费者线程去获取面包,但是此时生产者线程还没生产完面包,这种情况就会导致后面消费者线程没有人唤醒了,这样造成了resume没有起到它应有的作用(因为resume的时候消费者线程还没开始suspend,编程思想中称为“错失的信号”
产生死锁的示例代码:

T1:
    synchronized(sharedMonitor) {
        // 执行一段使得someCondition为false的代码
        consumerThread.resume();
    }
T2:
    while(someCondition){
        // 在此处进行线程切换,切换到上面的线程
         synchronized(sharedMonitor){
            consumerThread.suspend();
        }
    }

首先T2是消费者线程先运行起来,此时的someCondition为true,T2进入了while块中,然后调度器又调回给T1执行,T1已经将someCondition置为false,也就是说此时消费者线程已经不能被挂起了,但是由于此时已经判断过了someCondition,所以消费者线程会被挂起。(一般的解决方式是保证someCondition的判断和线程挂起是在一个同步块中就能防止由于后期判断条件已经变化导致不正确的结果,但是由于消费者线程挂起是不释放锁的,所以将标黄部分移到while外面也无法解决此问题,所以这种方式容易产生死锁)

3)会产生死锁的Demo2

public class SuspendDeadLockDemo2 {
    private static BlockingDeque<SuspendDemo.Bread> queue = new LinkedBlockingDeque<SuspendDemo.Bread>();

    public static void main(String args[]) throws Exception {
        Thread consumeTread = consumerThreadDeadLock2();
        producerThreadDeadLock2(consumeTread);
    }

    /**
     * 生产者线程死锁
     *
     * @param consumerThread
     * @throws Exception
     */
    public static void producerThreadDeadLock2(Thread consumerThread) throws Exception {
        System.out.println("生产者线程启动:");
        new Thread(() -> {
            synchronized (queue) {
                SuspendDemo.Bread bread = new SuspendDemo.Bread("bread", 1.0f);
                System.out.println("\t生产面包:" + bread);
                queue.add(bread);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("恢复消费者线程:");
                consumerThread.resume();
            }
        }).start();
    }

    /**
     * 消费者线程死锁
     *
     * @return
     * @throws Exception
     */
    public static Thread consumerThreadDeadLock2() throws Exception {
        // 启动线程
        Thread consumerThread = new Thread(() -> {
            System.out.println("消费者线程启动!");
            // 当前线程拿到锁,然后挂起
            SuspendDemo.Bread bread = null;
            while ((bread = queue.poll()) == null) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (queue) {
                    Thread.currentThread().suspend();
                    System.out.println("\t获取到了面包:" + bread);
                }
            }
        });
        consumerThread.start();
        return consumerThread;
    }

}

运行结果:可以看到程序一直挂在那运行不完

image.png

5.2.3.2 wait-notify

wait-notify跟上述5.2.3.1中描述的方式最大的区别在于wait的时候,会释放掉当前所占有的锁,并且wait-notify需要在同步代码块中执行(也就是要在synchronized块中执行)

1)正常的Demo

public class WaitDemo {

    private static BlockingDeque<Bread> queue = new LinkedBlockingDeque<Bread>();

    public static void main(String args[]) {
        try {
            consumeTread();
            Thread.sleep(1000);
            producerTread();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 正常的生产者线程
     */
    public static void producerTread() {
        System.out.println("启动生产者线程:");
        new Thread(() -> {
            synchronized (queue) {
                Bread bread = new Bread("bread", 1.0f);
                System.out.println("\t生产面包:" + bread);
                queue.add(bread);
                System.out.println("恢复消费者线程:");
                queue.notifyAll();
            }
        }).start();
    }

    /**
     * 正常的消费者线程
     */
    public static Thread consumeTread() {
        Thread consumer = new Thread(() -> {
            synchronized (queue) {
                // take不到东西说明面包还没生产好
                Bread bread = null;
                System.out.println("消费者线程被挂起!");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if ((bread = queue.poll()) != null) {
                    System.out.println("\t获取到了面包:" + bread);
                } else {
                    System.out.println("\t依旧获取不到面包");
                }
            }
        });
        System.out.println("启动消费者线程:");
        consumer.start();
        return consumer;
    }

    @Getter
    @Setter
    @AllArgsConstructor
    @ToString
    protected static class Bread {
        private String name;
        private float price;
    }

}

运行结果:

运行结果

2)会产生死锁的Demo

public class WaitDeadLockDemo {
    private static BlockingDeque<WaitDemo.Bread> queue = new LinkedBlockingDeque<>();

    public static void main(String args[]) throws Exception {
        consumerThreadDeadLock();
        producerThreadDeadLock();
    }

    /**
     * 生产者线程死锁
     *
     * @throws Exception
     */
    public static void producerThreadDeadLock() throws Exception {
        System.out.println("生产者线程启动:");
        new Thread(() -> {
            synchronized (queue) {
                WaitDemo.Bread bread = new WaitDemo.Bread("bread", 1.0f);
                System.out.println("\t生产面包:" + bread);
                queue.add(bread);
                System.out.println("恢复消费者线程:");
                queue.notifyAll();
            }
        }).start();
    }

    /**
     * 消费者线程死锁
     *
     * @return
     * @throws Exception
     */
    public static Thread consumerThreadDeadLock() throws Exception {
        // 启动线程
        Thread consumerThread = new Thread(() -> {
            System.out.println("消费者线程启动!");
            // 当前线程拿到锁,然后挂起
            WaitDemo.Bread bread = null;
            while ((bread = queue.poll()) == null) {
                try {
                    Thread.sleep(1000);
                    synchronized (queue) {
                        queue.wait();
                        System.out.println("\t获取到了面包:" + bread);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        });
        consumerThread.start();
        return consumerThread;
    }

}

运行结果:

运行结果

这种死锁的产生原因就是因为错失的信号,后面面包生产出来之后消费者线程已经不需要被挂起了,解决方式是保证someCondition的判断和线程挂起是在一个同步块中就能防止由于后期判断条件已经变化导致不正确的结果(简单地说就是确保消费者在面包生产前先进入阻塞等待状态)

public static Thread consumerThreadDeadLock() throws Exception {
    // 启动线程
    Thread consumerThread = new Thread(() -> {
        System.out.println("消费者线程启动!");
        // 当前线程拿到锁,然后挂起
        WaitDemo.Bread bread = null;
        while ((bread = queue.poll()) == null) {
            try {
                Thread.sleep(1000);
                synchronized (queue) {//将这个移到while外面,消除条件的竞争
                    queue.wait();
                    System.out.println("\t获取到了面包:" + bread);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    });
    consumerThread.start();
    return consumerThread;
}

3)会产生死锁的Demo的解决方案

public class WaitDeadLockResolveDemo {
    private static volatile BlockingDeque<WaitDemo.Bread> queue = new LinkedBlockingDeque<>();

    public static void main(String args[]) throws Exception {
        consumerThreadDeadLock();
        producerThreadDeadLock();
    }

    /**
     * 生产者线程死锁
     *
     * @throws Exception
     */
    public static void producerThreadDeadLock() throws Exception {
        System.out.println("生产者线程启动!");
        new Thread(() -> {
            synchronized (queue) {
                WaitDemo.Bread bread = new WaitDemo.Bread("bread", 1.0f);
                System.out.println("\t生产面包:" + bread);
                queue.add(bread);
                System.out.println("恢复消费者线程!");
                queue.notifyAll();
            }
        }).start();
    }

    /**
     * 消费者线程死锁
     *
     * @return
     * @throws Exception
     */
    public static Thread consumerThreadDeadLock() throws Exception {
        // 启动线程
        Thread consumerThread = new Thread(() -> {
            System.out.println("消费者线程启动!");
            // 当前线程拿到锁,然后挂起
            WaitDemo.Bread bread = null;
            synchronized (queue) {
                while ((bread = queue.poll()) == null) {
                    try {
                        Thread.sleep(1000);
                        // 消费者线程挂起
                        System.out.println("消费者线程挂起!");
                        queue.wait();
                        bread = queue.poll();
                        System.out.println("\t获取到了面包:" + bread);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        consumerThread.start();
        return consumerThread;
    }

}

运行结果:(这里没有运行完是因为获取了面包之后又判断队列为null,然后又挂起了消费者线程,继续消费面包)

运行结果

5.2.3.3 park-unpark

park-unpark的限制更小,它不要求在同步代码块中,并且它也不用考虑“错失的信号”的情况,你先在线程T2进行unpark,再对线程T1进行park,线程T1仍然能被唤醒。
线程调用park等待“许可”,unpark方法为指定线程提供“许可(permit)”,多次调用unpark之后再调用park,线程会直接运行,但不会叠加,即连续多次调用park,第一次会拿到许可直接运行,后续调用会进入等待。

1)正常的Demo

public class ParkDemo {

    private static BlockingDeque<ParkDemo.Bread> queue = new LinkedBlockingDeque<>();

    public static void main(String args[]) {
        Thread consumerThread = consumeTread();
        producerTread(consumerThread);
        System.out.println("启动消费者线程!");
        consumerThread.start();
    }

    /**
     * 正常的生产者线程
     */
    public static void producerTread(Thread consumerThread) {
        System.out.println("启动生产者线程!");
        new Thread(() -> {
            ParkDemo.Bread bread = new ParkDemo.Bread("bread", 1.0f);
            System.out.println("\t生产面包:" + bread);
            queue.add(bread);
            System.out.println("生产者线程唤醒消费者线程!");
            LockSupport.unpark(consumerThread);
        }).start();
    }

    /**
     * 正常的消费者线程
     */
    public static Thread consumeTread() {
        Thread consumer = new Thread(() -> {
            // take不到东西说明面包还没生产好
            ParkDemo.Bread bread = null;
            System.out.println("消费者线程被挂起!");
            LockSupport.park();
            System.out.println("消费者线程被唤醒!");
            if ((bread = queue.poll()) != null) {
                System.out.println("\t获取到了面包:" + bread);
            } else {
                System.out.println("\t依旧获取不到面包");
            }
        });
        return consumer;
    }

    @Getter
    @Setter
    @AllArgsConstructor
    @ToString
    protected static class Bread {
        private String name;
        private float price;
    }

}

运行结果:(这里是先执行了unpark后面再执行park的操作)

运行结果

5.3 伪唤醒

在编写线程通信相关的代码时,不能使用if语句来进行判断是否进入等待状态,这是错误的写法!官方建议应该在循环中检查等待条件,原因是处于等待状态的线程可能会受到错误警报和伪唤醒,如果不在循环中检查等待条件,程序就会在没有满足结束条件的情况下退出。
相当于应该这么写:

    // wait
    synchronized(obj) {
        while(<条件判断>)
            obj.wait();
        // … …后续操作
    }
    // park
    while(<条件判断>) {
        LockSupport.park();
        // … …后续操作
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容