工具类
有哪些控制并发流程的工具类? .png
CountDownLatch 倒计时门插
构造方法public CountDownLatch(int count)
,count为需要倒数的数值
调用await()方法的线程会被挂起,它会等待直到count为0才继续执行。
countDown()就是让count减1,为0时等待的线程会被唤起。
CountDownLatch 1.png
两个典型用法:
用法一:一个线程等待多个线程执行完毕,再执行自己的工作。
用法二:多个线程等待某一个线程的信号,同时开始执行。
CountDownLatch是不能重用的
Semaphore 信号量,许可证
限制或者管理有限资源的使用情况
信号量的作用是维护一个“许可证”的计数,线程可以获取许可证,那信号量剩余的许可证就减一,线程释放许可证,那信号量的余量就加一,当许可证为0时,有新的线程还想获得许可证,那就得等待,直到有线程释放了许可证。
获取和释放方法
acquire()的重载方法:acquire(int count)一次申请获取count个许可证
release() 也有重载方法: release(int count) 一次释放count个许可证
初始化和尝试获取方法
注意点:
- 获取和释放的数量要一致,如果不一致,可能会导致程序卡死。这是编程规范。
- 最好设为公平的,在构造时设为true。
- 并不是唏嘘要获取许可证的那个线程才能释放许可证,获取和释放对线程没有要求,也许是A获取,B释放,只要逻辑合理就行。
Condition接口(也称条件对象)
Condition作用
Condition作用 .png
signalAll()唤醒所有的;
signal()是公平的,唤醒等待时间最长的那个
Condition是绑定在锁上面的,通常配合ReentrantLock一起使用,通过lock.newCondition()
创建Condition。
注意点
注意点
用Condition实现生产者消费者问题
/**
* 描述: 演示用Condition实现生产者消费者模式
*/
public class ConditionDemo2 {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("队列空,等待数据");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signalAll();
System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
} finally {
lock.unlock();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("队列满,等待有空余");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
} finally {
lock.unlock();
}
}
}
}
}
CyclicBarrier 循环栅栏
CyclicBarrier的作用
CyclicBarrier和CountDownLatch 的不同
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到场了, 大家统一出发!");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new Task(i, cyclicBarrier)).start();
}
}
static class Task implements Runnable{
private int id;
private CyclicBarrier cyclicBarrier;
public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程" + id + "现在前往集合地点");
try {
Thread.sleep((long) (Math.random()*10000));
System.out.println("线程"+id+"到了集合地点,开始等待其他人到达");
cyclicBarrier.await();
System.out.println("线程"+id+"出发了");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}