第7章 Java并发包中并发队列原理剖析

目录

LinkedBlockingQueue和ArrayBlockingQueue比较简单,不进行讲解了。下面只介绍PriorityBlockingQueue和DelayQueue。

PriorityBlockingQueue

PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或最低的元素。内部使用二叉堆实现。

类图结构

PriorityBlockingQueue内部有一个数组queue,用来存放队列元素。allocationSpinLock是个自旋锁,通过CAS操作来保证同时只有一个线程可以扩容队列,状态为0或1。

由于这是一个优先队列,所以有一个comparator用来比较元素大小。

下面为构造函数:

private static final int DEFAULT_INITIAL_CAPACITY = 11;

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

可知默认队列容量为11,默认比较器为null,也就是使用元素的compareTo方法进行比较来确定元素的优先级,这意味着队列元素必须实现Comparable接口。

原理讲解

boolean offer()

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    // 获取独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 通过对二叉堆的上浮操作保证最大或最小的元素总在根节点
            siftUpComparable(n, e, array);
        else
            // 使用了自定义比较器
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        // 激活因调用take()方法被阻塞的线程
        notEmpty.signal();
    } finally {
        // 释放锁
        lock.unlock();
    }
    return true;
}

流程比较简单,下面主要看扩容和建堆操作。

先看扩容。

private void tryGrow(Object[] array, int oldCap) {
    // 由前面的代码可知,调用tryGrow函数前先获取了独占锁,
    // 由于扩容比较费时,此处先释放锁,
    // 让其他线程可以继续操作(如果满足可操作的条件的话),
    // 以提升并发性能
    lock.unlock();
    Object[] newArray = null;
    // 通过allocationSpinLock保证同时最多只有一个线程进行扩容操作。
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
        try {
            // 当容量比较小时,一次只增加2容量
            // 比较大时增加一倍
            int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : (oldCap >> 1));
            // 溢出检测
            if (newCap - MAX_ARRAY_SIZE > 0) {
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 释放锁,没用CAS是因为同时最多有一个线程操作allocationSpinLock
            allocationSpinLock = 0;
        }
    }
    // 如果当前线程发现有其他线程正在对队列进行扩容,
    // 则调用yield方法尝试让出CPU资源促使扩容操作尽快完成
    if (newArray == null)
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

下面来看建堆算法

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 获取父节点,设子节点索引为k,
        // 则由二叉堆的性质可知,父节点的索引总为(k - 1) >>> 1
        int parent = (k - 1) >>> 1;
        // 获取父节点对应的值
        Object e = array[parent];
        // 只有子节点的值小于父节点的值时才上浮
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

如果了解二叉堆的话,此处代码是十分容易理解的。关于二叉堆,可参看《数据结构之二叉堆》

E poll()

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 出队
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        // 获取尾节点,在实现对二叉堆的下沉操作时要用到
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 下沉操作,保证取走最小的节点(根节点)后,新的根节点仍时最小的,二叉堆的性质依然满足
            siftDownComparable(0, x, array, n);
        else
            // 使用自定义比较器
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

poll方法通过调用dequeue方法使最大或最小的节点出队并将其返回。

下面来看二叉堆的下沉操作。

private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;
        while (k < half) {
            // child为两个子节点(如果有的话)中较小的那个对应的索引
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            // 通过比较保证child对应的为较小值的索引
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            // 下沉,将较小的子节点换到父节点位置
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

同上,对下沉操作有疑问的话可参考上述文章。

void put(E e)

调用了offer

public void put(E e){
    offer(e);
}

E take()

take操作的作用是获取二叉堆的根节点元素,如果队列为空则阻塞。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 阻塞可被中断
    lock.lockInterruptibly();
    E result;
    try {
        // 队列为空就将当前线程放入notEmpty条件队列
        // 使用while循环判断是为了避免虚假唤醒
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

DelayQueue

DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每一个元素都有一个过期时间,当从队列中获取元素时只有过期元素才会出列。队列头元素是最快要过期的元素。

类图结构

DelayQueue内部使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。
队列里的元素要实现Delayed接口(Delayed接口继承了Comparable接口),用以得到过期时间并进行过期时间的比较。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

available是由lock生成的条件变量,用以实现线程间的同步。

leader是leader-follower模式的变体,用于减少不必要的线程等待。当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.waitNanos(delay)等待delay时间,但是其他线程(follower)则会调用available.await()进行无限等待。leader线程延迟时间过期后,会退出take方法,并通过调用available.signal()方法唤醒一个follower线程,被唤醒的线程会被选举为新的leader线程。

原理讲解

boolean offer(E e)

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 添加新元素
        q.offer(e);
        // 查看新添加的元素是否为最先过期的
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

上述代码首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,调用q.peek()方法返回的并不一定是当前添加的元素。当如果q.peek() == e,说明当前元素是最先要过期的,那么重置leader线程为null并激活available条件队列里的一个线程,告诉它队列里面有元素了。

E take()

获取并移除队列里面过期的元素,如果队列里面没有过期元素则等待。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 可中断
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 为空则等待
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                // 过期则成功获取
                if (delay <= 0)
                    return q.poll();
                // 执行到此处,说明头元素未过期    
                first = null; // don't retain ref while waiting
                // follower无限等待,直到被唤醒
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // leader等待lelay时间,则头元素必定已经过期
                        available.awaitNanos(delay);
                    } finally {
                        // 重置leader,给follower称为leader的机会
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 唤醒一个follower线程
            available.signal();
        lock.unlock();
    }
}

一个线程调用take方法时,会首先查看头元素是否为空,为空则直接等待,否则判断是否过期。
若头元素已经过期,则直接通过poll获取并移除,否则判断是否有leader线程。
若有leader线程则一直等待,否则自己成为leader并等待头元素过期。

E poll()

获取并移除头过期元素,如果没有过期元素则返回null。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        // 若队列为空或没有元素过期则直接返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

int size()

计算队列元素个数,包含过期的和未过期的。

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.size();
    } finally {
        lock.unlock();
    }
}

更多

相关笔记:《Java并发编程之美》阅读笔记

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容

  • 我一般都是在晚上躺下之后开始写日记,这个时间点太容易让人放弃,想着今天不更新,睡一觉有灵感再来写,就会白白浪费今天...
    九重城阙阅读 135评论 0 1
  • 朋友张小强毕业名校,后进入一家大型国企工作。单位待遇好时惊涛骇浪,坏时一池死水。但张小强因为机敏能干,再加上“21...
    木木想进步阅读 859评论 4 13
  • 七窍春心已被偷,夕阳色薄叹悠悠。 星光淡淡虽无意,月亮莹莹岂有愁? 思子片言多锦绣,念君一笑尽娇羞。 佳期梦醒应生...
    张桃源阅读 242评论 2 2
  • 有两个三年级的学生,特别喜欢说我不会,然后发脾气就摔笔不干。 是什么让孩子变得这么肆无忌惮?不会做练习,还一副很有...
    筱晓why阅读 343评论 0 1