一、CyclicBarrier
表示大家彼此等待,大家集合好后才开始出发,分散活动后又在指定地点集合碰面,这就好比整个公司的人员利用周末的时间集体郊游一样,先各自从家出发到公司集合后,再同时出发到公园游玩,在指定地点集合后再同时开始聚餐。
Cyclic:循环的,有周期的
Barrier:障碍物,屏障
代码实例:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
private static final int NUM = 3;
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CyclicBarrier cb = new CyclicBarrier(NUM);
// final CyclicBarrier cb2 = new CyclicBarrier(NUM);
// final CyclicBarrier cb3 = new CyclicBarrier(NUM);
for (int i = 0; i < NUM; i++) {
service.execute(()->{
try {
Thread.sleep((long) (Math.random()* 10000));
System.out.println("线程"+ Thread.currentThread().getName()
+ "即将到达集合点1,当前已有" + (cb.getNumberWaiting()+1) +"个已经到达"+
(cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊": ",正在等候"));
cb.await();
Thread.sleep((long) (Math.random()* 10000));
// Thread.sleep((1000));
System.out.println("线程"+ Thread.currentThread().getName()
+ "即将到达集合点2,当前已有" + (cb.getNumberWaiting()+1) +"个已经到达"+
(cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊": ",正在等候"));
cb.await();
// Thread.sleep((1000));
Thread.sleep((long) (Math.random()* 10000));
System.out.println("线程"+ Thread.currentThread().getName()
+ "即将到达集合点3,当前已有" + (cb.getNumberWaiting()+1) +"个已经到达"+
(cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊": ",正在等候"));
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}
输出结果:
线程pool-1-thread-3即将到达集合点1,当前已有1个已经到达,正在等候
线程pool-1-thread-1即将到达集合点1,当前已有2个已经到达,正在等候
线程pool-1-thread-2即将到达集合点1,当前已有3个已经到达都到齐了,继续走啊
线程pool-1-thread-1即将到达集合点2,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点2,当前已有2个已经到达,正在等候
线程pool-1-thread-3即将到达集合点2,当前已有3个已经到达都到齐了,继续走啊
线程pool-1-thread-3即将到达集合点3,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点3,当前已有2个已经到达,正在等候
线程pool-1-thread-1即将到达集合点3,当前已有3个已经到达都到齐了,继续走啊
若是不sleep()或是sleep(固定值),则输出结果有问题
eg:
Thead.sleep(2000);
结果:
线程pool-1-thread-1即将到达集合点1,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点1,当前已有1个已经到达,正在等候
线程pool-1-thread-3即将到达集合点1,当前已有2个已经到达,正在等候
线程pool-1-thread-3即将到达集合点2,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点2,当前已有1个已经到达,正在等候
线程pool-1-thread-1即将到达集合点2,当前已有1个已经到达,正在等候
线程pool-1-thread-3即将到达集合点3,当前已有1个已经到达,正在等候
线程pool-1-thread-1即将到达集合点3,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合点3,当前已有2个已经到达,正在等候
待解答????
二、CountDownLatch
1. 概念:Latch:门闩,闩锁
- 犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数器到达0时,则所有等待者或单个等待者开始执行
- 可以实现一个人(也可以是多人)等待其它所有人都来通知他,可以实现一个人通知多个人的效果,类似裁判一声口令,运动员同时开始奔跑,或者所有运动员都跑到终点后裁判才可以公布结果,用这个功能来做百米赛跑的游戏程序不错。
程序示例1:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest {
private static final int NUM = 3;
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cdOrder = new CountDownLatch(1); //裁判吹口哨下命令开始跑
final CountDownLatch cdAnswer = new CountDownLatch(NUM);//运动员跑完到终点响应
for (int i = 0; i < NUM; i++) {
service.execute(()->{
try {
System.out.println("线程"+Thread.currentThread().getName()+"正在准备接受命令");
cdOrder.await();
System.out.println("线程"+Thread.currentThread().getName()+"已经接受了命令");
Thread.sleep((long) (Math.random()* 10000));
System.out.println("线程"+Thread.currentThread().getName()+"回应命令处理结果");
cdAnswer.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
try {
System.out.println("线程"+Thread.currentThread().getName()+"即将发布命令");
cdOrder.countDown();
System.out.println("线程"+Thread.currentThread().getName()+"已经发布了命令,正在等待结果");
cdAnswer.await();
System.out.println("线程"+Thread.currentThread().getName()+"已经收到所有响应结果");
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
}
}
线程pool-1-thread-2正在准备接受命令
线程main即将发布命令
线程main已经发布了命令,正在等待结果
线程pool-1-thread-3正在准备接受命令
线程pool-1-thread-1正在准备接受命令
线程pool-1-thread-3已经接受了命令
线程pool-1-thread-2已经接受了命令
线程pool-1-thread-1已经接受了命令
线程pool-1-thread-2回应命令处理结果
线程pool-1-thread-1回应命令处理结果
线程pool-1-thread-3回应命令处理结果
线程main已经收到所有响应结果
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class CountDownLatchTest {
private final static int COUNTDOWN_LATCH_NUM = 10;
public static void main(String[] args){
CountDownLatch countDownLatch = new CountDownLatch(COUNTDOWN_LATCH_NUM);
AtomicInteger num = new AtomicInteger(COUNTDOWN_LATCH_NUM);
for (int i = 0; i < COUNTDOWN_LATCH_NUM ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//real logic
System.out.println(num.getAndDecrement() + " at time : " + System.currentTimeMillis());
}
}).start();
}
}
}
9 at time : 1579231887071
8 at time : 1579231887071
2 at time : 1579231887071
3 at time : 1579231887071
4 at time : 1579231887071
7 at time : 1579231887071
5 at time : 1579231887071
6 at time : 1579231887071
10 at time : 1579231887071
1 at time : 1579231887071
程序示例3: 一个线程的某一步骤的先决执行条件:依赖多个其他线程的成功执行
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(5);
/**
* Boss线程,等待员工到达开会
*/
static class BossThread extends Thread{
@Override
public void run() {
System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");
try {
//Boss等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有人都已经到齐了,开会吧...");
}
}
//员工到达会议室
static class EmpleoyeeThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ",到达会议室....");
//员工到达会议室 count - 1
countDownLatch.countDown();
}
}
public static void main(String[] args){
//Boss线程启动
new BossThread().start();
for(int i = 0 ; i < 5 ; i++){
new EmpleoyeeThread().start();
}
}
}
Boss在会议室等待,总共有5个人开会...
Thread-1,到达会议室....
Thread-2,到达会议室....
Thread-3,到达会议室....
Thread-4,到达会议室....
Thread-5,到达会议室....
所有人都已经到齐了,开会吧...
三、CyclicBarrier 和 CountDownLatch的区别
- CyclicBarrier:一个线程来了,await()就代表线程数➕1,没什么逻辑判断,就是对线程计数的
- CountDownLatch:执行线程await(),另外的线程countDown() ,而且什么时候countDown(),countDown()多少次可以根据你的业务逻辑定制。
四、Exchanger
用于实现2个人之间的数据交换,每个人在完成一定的事务后想与另一个人交换数据,第一个先拿出数据的人将一直等待第二个人拿出数据到来时,才能彼此交换数据。
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger<String> exchanger = new Exchanger<>();
service.execute(()->{
try {
String data1 = "hello";
System.out.println("线程"+Thread.currentThread().getName()+"正在把" + data1 + "交换出去");
Thread.sleep((long) (Math.random()* 10000));
System.out.println("线程"+Thread.currentThread().getName()+"已经接受了命令");
String data2 = exchanger.exchange(data1);
System.out.println("线程"+Thread.currentThread().getName()+"换回的数据为:"+ data2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
service.execute(()->{
try {
String data1 = "world";
System.out.println("线程"+Thread.currentThread().getName()+"正在把" + data1 + "交换出去");
Thread.sleep((long) (Math.random()* 10000));
System.out.println("线程"+Thread.currentThread().getName()+"已经接受了命令");
String data2 = exchanger.exchange(data1);
System.out.println("线程"+Thread.currentThread().getName()+"换回的数据为:"+ data2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
service.shutdown();
while (!service.isTerminated()) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("结束了!");
}
}
线程pool-1-thread-1正在把hello交换出去
线程pool-1-thread-2正在把world交换出去
线程pool-1-thread-2已经接受了命令
线程pool-1-thread-1已经接受了命令
线程pool-1-thread-1换回的数据为:world
线程pool-1-thread-2换回的数据为:hello