CyclicBarrier 是什么?
让一组线程到达一个屏障后被阻塞,直到最后一个线程到达屏障时,屏障才会“开门”,所有被屏障阻塞的线程继续执行。
CyclicBarrier 构造
CyclicBarrier(int parties)
parties表示屏障拦截的线程数量,当线程调用await()方法就是告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier(int parties,Runnable barrierAction)
用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
CyclicBarrier 方法
类型 | 方法 | 描述 |
---|---|---|
int | await() | 等待直到parties个线程都调用了await() |
int | await(long timeout, TimeUnit unit) | 等待直到parties个线程都调用了await() 或者过了超时时间 |
int | getNumberWaiting() | 获取CyclicBarrier当前在等待的线程数量 |
int | getParties() | 获取CyclicBarrier拦截线程数量 |
boolean | isBroken() | 获取阻塞的线程是否被中断 |
int | reset() | 将屏障设置为初始状态 |
CyclicBarrier(int parties)例子
public class CyclicBarrierTest {
//定义一个屏障,设置拦截两个线程
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
//通知CyclicBarrier我已经到达屏障,线程阻塞
cyclicBarrier.await();
System.out.println("1");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("2");
new Thread(new Runnable() {
@Override
public void run() {
try {
//通知CyclicBarrier我已经到达屏障,线程阻塞
cyclicBarrier.await();
System.out.println("3");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
输出结果
2
3
1
因为我们定义的CyclicBarrier是拦截两个线程,所以第一个线程执行了await()后开始阻塞,然后继续执行,输出“2”,当第二个线程执行了await()方法后,第二个线程达到屏障,屏障打开“大门”,两个线程继续执行,输出“3”和“1”,也有一种可能是先输出“1”然后输出“3”,这是因为主线程和子线程的调度是由CPU决定的,都有可能先执行。
CyclicBarrier(int parties,Runnable barrierAction)例子
public class CyclicBarrierTest2 {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
System.out.println("1");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("2");
new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
System.out.println("3");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
static class A implements Runnable {
@Override
public void run() {
System.out.println("4");
}
}
}
结果
2
4
3
1
根据上面的例子稍微进行改造,当第二个线程达到屏障后,优先执行了A,然后阻塞线程才继续执行。
CyclicBarrier应用场景
可以用于多线程计算数据,最后合并计算结果的场景。
例子
创建一个屏障,设置拦截线程数为10
假设每个线程计算结果返回1
最终十个线程计算结果相加得到的结果应该为10
package com.sy.thread.example;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
/**
* Description: thread
* @author songyu
*/
public class CyclicBarrierTest3 implements Runnable{
/**
* 创建屏障
*/
private CyclicBarrier cyclicBarrier = new CyclicBarrier(10,this::run);
/**
* 保存每个线程执行的结果
*/
private ConcurrentHashMap<String,Integer> concurrentHashMap = new ConcurrentHashMap<>();
private void calculationData() {
//使用线程不规范,实际使用时可以使用ThreadPoolExecutor
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
concurrentHashMap.put(Thread.currentThread().getName(),1);
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
@Override
public void run() {
int result = 0;
for (Map.Entry<String, Integer> map : concurrentHashMap.entrySet()) {
result = result + map.getValue();
}
System.out.println("最终计算结果:" + result);
}
public static void main(String[] args) {
CyclicBarrierTest3 c = new CyclicBarrierTest3();
c.calculationData();
}
}
结果
最终计算结果:10