线程安全7 - CyclicBarrier、CountDownLatch、Exchanger

一、CyclicBarrier

  表示大家彼此等待,大家集合好后才开始出发,分散活动后又在指定地点集合碰面,这就好比整个公司的人员利用周末的时间集体郊游一样,先各自从家出发到公司集合后,再同时出发到公园游玩,在指定地点集合后再同时开始聚餐。

Cyclic:循环的,有周期的
Barrier:障碍物,屏障

三个线程干完各自的任务,在不同的时刻到达集合点后,就可以接着忙各自的工作去了,再到达新的集合点,再去忙各自的工作,到达集合点了用CyclicBarrier对象的await方法表示。 为什么几个人要碰到一起,说白了,就是大家都把手头这一阶段的工作做完了,就可以碰到一起了。譬如:我下楼等方老师,就是等他手头工作做完了,他到达了要集合的状态,就集合了。

代码实例:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {
    
    private static final int NUM = 3;
    public static void main(String[] args) {
        
        ExecutorService service = Executors.newCachedThreadPool();
        final CyclicBarrier cb = new CyclicBarrier(NUM);
//      final CyclicBarrier cb2 = new CyclicBarrier(NUM);
//      final CyclicBarrier cb3 = new CyclicBarrier(NUM);
        
        for (int i = 0; i < NUM; i++) {
            service.execute(()->{
                try {
                    Thread.sleep((long) (Math.random()* 10000));
                    
                    System.out.println("线程"+ Thread.currentThread().getName() 
                            + "即将到达集合点1,当前已有" + (cb.getNumberWaiting()+1) +"个已经到达"+
                            (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊": ",正在等候"));
                    cb.await();
                    
                    Thread.sleep((long) (Math.random()* 10000));
//                  Thread.sleep((1000));
                    
                    System.out.println("线程"+ Thread.currentThread().getName() 
                            + "即将到达集合点2,当前已有" + (cb.getNumberWaiting()+1) +"个已经到达"+
                            (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊": ",正在等候"));
                    cb.await();
                    
//                  Thread.sleep((1000));
                    Thread.sleep((long) (Math.random()* 10000));
                    System.out.println("线程"+ Thread.currentThread().getName() 
                            + "即将到达集合点3,当前已有" + (cb.getNumberWaiting()+1) +"个已经到达"+
                            (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊": ",正在等候"));
                    cb.await();
                    
                    
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        service.shutdown();

    }

}

输出结果:

线程pool-1-thread-3即将到达集合点1,当前已有1个已经到达,正在等候
线程pool-1-thread-1即将到达集合点1,当前已有2个已经到达,正在等候
线程pool-1-thread-2即将到达集合点1,当前已有3个已经到达都到齐了,继续走啊
线程pool-1-thread-1即将到达集合点2,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点2,当前已有2个已经到达,正在等候
线程pool-1-thread-3即将到达集合点2,当前已有3个已经到达都到齐了,继续走啊
线程pool-1-thread-3即将到达集合点3,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点3,当前已有2个已经到达,正在等候
线程pool-1-thread-1即将到达集合点3,当前已有3个已经到达都到齐了,继续走啊

\color{red}{潜在的问题:}
若是不sleep()或是sleep(固定值),则输出结果有问题

eg:
  Thead.sleep(2000);
结果:
线程pool-1-thread-1即将到达集合点1,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点1,当前已有1个已经到达,正在等候
线程pool-1-thread-3即将到达集合点1,当前已有2个已经到达,正在等候
线程pool-1-thread-3即将到达集合点2,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点2,当前已有1个已经到达,正在等候
线程pool-1-thread-1即将到达集合点2,当前已有1个已经到达,正在等候
线程pool-1-thread-3即将到达集合点3,当前已有1个已经到达,正在等候
线程pool-1-thread-1即将到达集合点3,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点3,当前已有2个已经到达,正在等候

待解答????


二、CountDownLatch

1. 概念:Latch:门闩,闩锁

  • 犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数器到达0时,则所有等待者或单个等待者开始执行
  • 可以实现一个人(也可以是多人)等待其它所有人都来通知他,可以实现一个人通知多个人的效果,类似裁判一声口令,运动员同时开始奔跑,或者所有运动员都跑到终点后裁判才可以公布结果,用这个功能来做百米赛跑的游戏程序不错。
程序示例1:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest {
    private static final int NUM = 3;
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final CountDownLatch cdOrder = new CountDownLatch(1); //裁判吹口哨下命令开始跑
        final CountDownLatch cdAnswer = new CountDownLatch(NUM);//运动员跑完到终点响应
        
        for (int i = 0; i < NUM; i++) {
            service.execute(()->{
                try {
                    System.out.println("线程"+Thread.currentThread().getName()+"正在准备接受命令");
                    cdOrder.await();
                    System.out.println("线程"+Thread.currentThread().getName()+"已经接受了命令");
                    Thread.sleep((long) (Math.random()* 10000));
                    System.out.println("线程"+Thread.currentThread().getName()+"回应命令处理结果");
                    cdAnswer.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        try {
            System.out.println("线程"+Thread.currentThread().getName()+"即将发布命令");
            cdOrder.countDown();
            System.out.println("线程"+Thread.currentThread().getName()+"已经发布了命令,正在等待结果");
            cdAnswer.await();
            System.out.println("线程"+Thread.currentThread().getName()+"已经收到所有响应结果");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        service.shutdown();
    }
}   
线程pool-1-thread-2正在准备接受命令
线程main即将发布命令
线程main已经发布了命令,正在等待结果
线程pool-1-thread-3正在准备接受命令
线程pool-1-thread-1正在准备接受命令
线程pool-1-thread-3已经接受了命令
线程pool-1-thread-2已经接受了命令
线程pool-1-thread-1已经接受了命令
线程pool-1-thread-2回应命令处理结果
线程pool-1-thread-1回应命令处理结果
线程pool-1-thread-3回应命令处理结果
线程main已经收到所有响应结果

\color{red}{程序示例2:模拟高并发,让多个线程同时执行}

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class CountDownLatchTest {
    private final static int COUNTDOWN_LATCH_NUM = 10;
    public static void main(String[] args){
        CountDownLatch countDownLatch = new CountDownLatch(COUNTDOWN_LATCH_NUM);
        AtomicInteger num = new AtomicInteger(COUNTDOWN_LATCH_NUM);
        for (int i = 0; i < COUNTDOWN_LATCH_NUM ; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    countDownLatch.countDown();
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //real logic
                    System.out.println(num.getAndDecrement() + " at time : " + System.currentTimeMillis());
                }
            }).start();
        }
    }
}
9 at time : 1579231887071
8 at time : 1579231887071
2 at time : 1579231887071
3 at time : 1579231887071
4 at time : 1579231887071
7 at time : 1579231887071
5 at time : 1579231887071
6 at time : 1579231887071
10 at time : 1579231887071
1 at time : 1579231887071
程序示例3: 一个线程的某一步骤的先决执行条件:依赖多个其他线程的成功执行
public class CountDownLatchTest {
    private static CountDownLatch countDownLatch = new CountDownLatch(5);

    /**
     * Boss线程,等待员工到达开会
     */
    static class BossThread extends Thread{
        @Override
        public void run() {
            System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");
            try {
                //Boss等待
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("所有人都已经到齐了,开会吧...");
        }
    }

    //员工到达会议室
    static class EmpleoyeeThread  extends Thread{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ",到达会议室....");
            //员工到达会议室 count - 1
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args){
        //Boss线程启动
        new BossThread().start();

        for(int i = 0 ; i < 5 ; i++){
            new EmpleoyeeThread().start();
        }
    }
}
Boss在会议室等待,总共有5个人开会...
Thread-1,到达会议室....
Thread-2,到达会议室....
Thread-3,到达会议室....
Thread-4,到达会议室....
Thread-5,到达会议室....
所有人都已经到齐了,开会吧...


三、CyclicBarrier 和 CountDownLatch的区别

  1. CyclicBarrier:一个线程来了,await()就代表线程数➕1,没什么逻辑判断,就是对线程计数的
  2. CountDownLatch:执行线程await(),另外的线程countDown() ,而且什么时候countDown(),countDown()多少次可以根据你的业务逻辑定制。


四、Exchanger

用于实现2个人之间的数据交换,每个人在完成一定的事务后想与另一个人交换数据,第一个先拿出数据的人将一直等待第二个人拿出数据到来时,才能彼此交换数据。

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {
    public static void main(String[] args) {
        
        ExecutorService service = Executors.newCachedThreadPool();
        final Exchanger<String> exchanger = new Exchanger<>();
        service.execute(()->{
            try {
                String data1 = "hello";
                System.out.println("线程"+Thread.currentThread().getName()+"正在把" + data1 + "交换出去");
                Thread.sleep((long) (Math.random()* 10000));
                System.out.println("线程"+Thread.currentThread().getName()+"已经接受了命令");
                String data2 = exchanger.exchange(data1);
                
                System.out.println("线程"+Thread.currentThread().getName()+"换回的数据为:"+ data2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        service.execute(()->{
            try {
                String data1 = "world";
                System.out.println("线程"+Thread.currentThread().getName()+"正在把" + data1 + "交换出去");
                Thread.sleep((long) (Math.random()* 10000));
                System.out.println("线程"+Thread.currentThread().getName()+"已经接受了命令");
                String data2 = exchanger.exchange(data1);
                
                System.out.println("线程"+Thread.currentThread().getName()+"换回的数据为:"+ data2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        service.shutdown();
        while (!service.isTerminated()) {  
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }  
        } 
        System.out.println("结束了!");
    }
}
线程pool-1-thread-1正在把hello交换出去
线程pool-1-thread-2正在把world交换出去
线程pool-1-thread-2已经接受了命令
线程pool-1-thread-1已经接受了命令
线程pool-1-thread-1换回的数据为:world
线程pool-1-thread-2换回的数据为:hello
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容