线程协作,控制并发流程工具

工具类

有哪些控制并发流程的工具类? .png

CountDownLatch 倒计时门插

构造方法public CountDownLatch(int count),count为需要倒数的数值
调用await()方法的线程会被挂起,它会等待直到count为0才继续执行。
countDown()就是让count减1,为0时等待的线程会被唤起。

CountDownLatch 1.png

两个典型用法:
用法一:一个线程等待多个线程执行完毕,再执行自己的工作。
用法二:多个线程等待某一个线程的信号,同时开始执行。

CountDownLatch是不能重用的

Semaphore 信号量,许可证

限制或者管理有限资源的使用情况

信号量的作用是维护一个“许可证”的计数,线程可以获取许可证,那信号量剩余的许可证就减一,线程释放许可证,那信号量的余量就加一,当许可证为0时,有新的线程还想获得许可证,那就得等待,直到有线程释放了许可证。

获取和释放方法

acquire()的重载方法:acquire(int count)一次申请获取count个许可证

release() 也有重载方法: release(int count) 一次释放count个许可证

初始化和尝试获取方法

注意点:

  1. 获取和释放的数量要一致,如果不一致,可能会导致程序卡死。这是编程规范。
  2. 最好设为公平的,在构造时设为true。
  3. 并不是唏嘘要获取许可证的那个线程才能释放许可证,获取和释放对线程没有要求,也许是A获取,B释放,只要逻辑合理就行。

Condition接口(也称条件对象)

Condition作用
Condition作用 .png

signalAll()唤醒所有的;
signal()是公平的,唤醒等待时间最长的那个

Condition是绑定在锁上面的,通常配合ReentrantLock一起使用,通过lock.newCondition() 创建Condition。

注意点

注意点

用Condition实现生产者消费者问题

/**
 * 描述:     演示用Condition实现生产者消费者模式
 */
public class ConditionDemo2 {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}

CyclicBarrier 循环栅栏

CyclicBarrier的作用

CyclicBarrier和CountDownLatch 的不同

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到场了, 大家统一出发!");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable{
        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + id + "现在前往集合地点");
            try {
                Thread.sleep((long) (Math.random()*10000));
                System.out.println("线程"+id+"到了集合地点,开始等待其他人到达");
                cyclicBarrier.await();
                System.out.println("线程"+id+"出发了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容