前言
CountDownLatch和CyclicBarrier都是通过计数器来控制线程。
CountDownLatch
定义:允许一个或多个线程等待直到在其他线程中执行的一组操作完成。
具体涉及的方法:
await();调用该方法会开始阻塞,让线程进入等待状态,当计数器的值递减到0时,阻塞的线程将会被唤醒并继续执行。
countDown();调用该方法时计数器-1。
CountDownLatch的应用1:
编写一个程序,模拟一个100米短跑的场景。每个运动员的跑步过程就是一个线程。要求如下:
1、发令枪鸣枪后选手们开始赛跑;
2、在所有运动员达到终点后系统开始统计选手们的成绩并把成绩发送到成绩统计系统;
3、对所有运动员的成绩进行排序输出。
针对这个场景,有两个计数器,分别是裁判员latch1和运动员latch2。运动员在起点等待时,可调用latch1.await();让运动员线程阻塞,后续裁判发布开始口令时,调用countDown,此时运动员线程开始跑步。
裁判员在发布口令时,调用latch2.await,此时主线程(裁判员)则会进入阻塞。每个运动员子线程在到达终点后,调用latch2.countDown,待运动员计数器为0时,代表所有运动员已到达终点。
具体实现代码:
public static void main(String[] args){
//运动员
CountDownLatch player = new CountDownLatch(10);
//裁判
CountDownLatch referee = new CountDownLatch(1);
Vector<String> list = new Vector<>();
for(int i = 0; i < 10; i++){
Thread t1 = new Thread(()->{
long start = System.currentTimeMillis();
try {
referee.await();
Thread.sleep(RandomUtil.randomInt(10)*1000);
long end = System.currentTimeMillis();
double sec = (end-start)/1000;
list.add("姓名:"+Thread.currentThread().getName()+"。成绩:"+sec+"秒。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
player.countDown();
}
});
t1.start();
}
try {
System.out.println("裁判准备发布指令");
Thread.sleep(RandomUtil.randomInt(10)*1000);
referee.countDown();
System.out.println("开始比赛");
player.await();
System.out.println("比赛结束,比赛结果为:");
for (int i = 1; i<=list.size();i++) {
System.out.println("第"+i+"名,"+list.get(i-1));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CyclicBarrier
定义:CyclicBarrier为多个线程相互等待。
CountDownLatch的countDown和await这个过程是一次性的。但CyclicBarrier则是可以无限循环的。CyclicBarrier只有await方法,表示线程间的相互等待,当所有线程已调用该方法后,则该组线程被唤醒,计数器重置进入下一个循环。
具体涉及的方法:
await();等待所有线程已经在这个障碍上调用了await。
CyclicBarrier的应用1:
公园游船,假设船的数量是足够的,每艘船可以坐5个人,人齐了就开船。此时则可以使用CyclicBarrier计数器。每个人代表一个线程,每上船一人,则调用一次await方法,表示我在等待其他人上船,待有五个人都调用了await时,阻塞的线程则被唤醒(船满了出发)。
具体代码实现:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,7,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10));
final CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{
System.out.println("船已满,开船");
});
for (int i = 0; i < 18; i++) {
executor.execute(()->{
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("子线程" + Thread.currentThread().getName() + "上船");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
Semaphore
定义:Semaphore是一个计数的信号量。协调多个线程对于共享变量的获取。
这个计数器的模型可以类比成为停车场,一个停车场的车位是有限的,每辆车开进时如果有空位则能进入,没有就等待。通过计数器限制了使用该停车场的最大车位数。
Semaphore在具体的业务场景,可以用在对象池,线程池中。限制能同时进入的线程数。
具体涉及的方法:
semaphore.acquire(); 从该信号量获取许可证,阻止直到可用
semaphore.release(); 释放许可证,将其返回到信号量
Semaphore的应用1:
新建Semaphore,设置有多少个许可(即停车位)。每辆车驶入时,获取许可,离开时释放许可。
具体代码实现:
public class SemaphoreTest {
/**
* 假设当天回来到该停车场的数量
*/
private static int THREAD_COUNT = 30;
private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
/**
* 停车场的车位
*/
private static Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
for(int i=0; i<THREAD_COUNT; i++){
executorService.execute(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"车辆进入停车场");
//停车时间
Thread.sleep(RandomUtil.randomInt(10)*1000);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}