PriorityBlockingQueue 源码分析(基于Java 8)

1. PriorityBlockingQueue定义

PriorityBlockingQueue 是基于 二叉堆, ReentrantLock, Condition 实现的并发安全的优先级队列.
主要有以下特点:

  1. 数据容量没有界限(最大值 Integer.MAX_VALUE - 8)
  2. 居于ReentrantLock 实现并发安全, 基于 Condition 实现线程等待唤醒
  3. 数据底层存放在居于数组实现的二叉堆上, 注意这里没有实现堆排序, 只是每次有数据变更时将最小/大放在了堆的最上面的节点上(PS: 不了解二叉堆的请戳这里)
2. 初始化方法 堆化(heapify)

实现思路: 从堆的最后一个 parent 开始, 将最小/大值放在 parent位置, 直到最顶层的parent为止;
直接看代码

/**
 * Establishes the heap invariant (described above) in the entire tree
 * assuming nothing about the order of the elements prior to the call
 */
private void heapify(){
    /**
     * 将这个数组进行 堆化 (heapify)
     *
     */
    Object[] array = queue;
    int n = size;
    int half = (n >>> 1) -1;                // 1. 这里有个注意点 n 是数组的 length,
    Comparator<? super E> cmp = comparator; // 2. 获取 比较器, 若这里的 comparator是空, 则用元素自己实现的比较接口进行比较
    if(cmp == null){
        for(int i = half; i >= 0; i--){     // 3. 从整个数组的最后一颗树开始, 将二叉树的最小值放置在parent位置, 一直到最上面的那颗二叉树
            siftDownComparable(i, (E)array[i], array, n);
        }
    }else{
        for(int i = half; i >= 0; i--){
            siftDownUsingComparator(i, (E)array[i], array, n, cmp);
        }
    }
                                            // 4. 经过这个 heapify 方法后, 整个二叉堆中的最小值已经放在的 index=0 的位置上(注意: 这时不保证 左子树一定小于右子树)
                                            // 5. 若要进行二叉堆的排序, 则需要将 index=0的位置排查在外 从 index= 1的位置开始, 到最后一个位置, 再进行上面的操作
                                            // 其实思路就是 每次将最小值放在数组的最上面, 然后排除这个节点在外, 将下面的数组作为一个整体, 然后重复上面的步骤, 直到最后一个元素
}

这个方法其实从最后一个 parent 开始进行与子节点比较, 将最小/大值放在 parent 位置, 直到 顶层的 parent 为止
我们发现代码中有个 siftDownComparable 方法, 这是实现 堆化的重要步骤

/**
 * Inserts item x at position k, maintaining heap invariant by
 * demoting x down the tree repeatedly until it is less than or
 * equal to its children or is a leaf
 *
 * @param k     the position to fill
 * @param x     the item to insert
 * @param array the heap array
 * @param n     the heap array
 * @param <T>
 */
private static <T> void siftDownComparable(int k, T x, Object[] array, int n){
    /**
     * 从整个数组的 k 位置开始向下进行 比较更换操作
     * 1. 获取这个数组的中间值(大于等于它其实就是说已经没有子节点)
     *      举例: 数组 array 含有元素 : 1,2,3,4,5,6,7,8,9,10 共10个元素
     *          其中的之间 half = n >>> 1 = 10 >>> 1 = 5; (就是下面代码中的 half, 堆中所有parent的 index 均小于 5)
     *          而最大 parent 的index 是 : (9 - 1) >>> 1 = 4;
     *          再parent调整好后, 再下面的代码中获取的 k 就变成 9/10, 但是 9/10 > 5 (就是下面代码的 while(k < half))
     * 2. 从k位子开始不断向下比较, 将最小值放到 parent位置, 直到 k >= half
     * 3. 经过这个方法比较后, 从k往下 都是最小值上parent上的一个棵二叉树
     */
    if(n > 0){
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;                 // 1. 获取整个数组的中间坐标
        while(k < half){                    // 2. k这里其实表示 parent 在数组中的 index, k >= half 其实就说明 k 在数组中已经没有子节点
            int child = (k << 1) + 1;       // 3. 获取 k 的左子节点的 index
            Object c = array[child];        // 4. 获取左子节点的值
            int right = child + 1;          // 5. 获取右子节点的 index
            if(right < n &&                 // 6. 这个 if 判断其实是 判断左右子节点的大小, 并且找到其中的最小值, 赋值给 c;
                    ((Comparable<? super T>)c).compareTo((T)array[right]) > 0
                    ){
                c = array[child = right];
            }
            if(key.compareTo((T)c) <= 0){   // 7. key <= c 则说明, 进行下面 sift 已经完成 (父节点k已经小于等于子节点), 直接 break 出
                break;
            }
            array[k] = c;                   // 8. 代码运行到这里说明 k > c, 则将子数据c赋值到k的位置
            k = child;                      // 9. 将上次的子节点 child作为父节点, 再次下面进行比较, 直到 k >= half
        }
        array[k] = key;                     // 10. 将key值赋值给最后一次进行 siftdown 比较的  父节点上
    }
}

操作思路:

 从整个数组的 k 位置开始向下进行 比较更换操作
  1. 获取这个数组的中间值(大于等于它其实就是说已经没有子节点)
       举例: 数组 array 含有元素 : 1,2,3,4,5,6,7,8,9,10 共10个元素
           其中的之间 half = n >>> 1 = 10 >>> 1 = 5; (就是下面代码中的 half, 堆中所有parent的 index 均小于 5)
           而最大 parent 的index 是 : (9 - 1) >>> 1 = 4;
           再parent调整好后, 再下面的代码中获取的 k 就变成 9/10, 但是 9/10 > 5 (就是下面代码的 while(k < half))
  2. 从k位子开始不断向下比较, 将最小值放到 parent位置, 直到 k >= half
  3. 经过这个方法比较后, 从k往下 都是最小值上parent上的一个棵二叉树
3. 添加元素 offer 方法

主要思路: 将添加的元素放置到数组的最尾端, 然后调用 siftUp 进行向上调整

  /**
 * Inserts the specified element into this priority queue
 * As the queue is unbounded, his method will never return {@code false}
 *
 * @param e the lement to add
 * @return {@code true} (as specified element cannot be compared
 *          with elements currently in the priority queue according to the
 *          priority queue's ordering)
 * @throws NullPointerException if the specified element is null
 */
@Override
public boolean offer(E e) {
    if(e != null){
        throw new NullPointerException();
    }
    final ReentrantLock lock = this.lock;       // 1. 获取全局共享的锁
    lock.lock();
    int n, cap;
    Object[] array;                             // 2. 判断容器是否需要扩容
    while((n = size) >= (cap = (array = queue).length)){
        tryGrow(array, cap);                    // 3. 进行扩容操作
    }

    try{
        Comparator<? super E> cmp = comparator;
        if(cmp == null){                        // 4. 进行 保持 heap 性质的 siftUp 操作
            siftUpComparable(n, e, array);
        }else{
            siftUpUsingComparator(n, e, array, cmp);
        }
        size = n + 1;                           // 5. 数据插入后, 整个容量值 + 1;
        notEmpty.signal();                      // 6. Condition 释放信号, 告知其他等待的线程: 容器中已经有元素
    }finally {
        lock.unlock();                          // 7. 释放锁
    }
    return true;
}

在代码中我们看到了 tryGrow, 这个调整堆存储空间的方法
在里面运用了 先进行锁的释放 lock.unlock, 然后 根据 allocationSpinLock 这个指标判断是否其他线程在进行扩容, 基本上每次扩容都是 * 1.5

/**
 * Tries to grow array to accommodate at least one more element
 * (but normally expend by about 50%), giving up (allowing retry)
 * on contention (which we expect to be race). Call only this while
 * holding lock
 *
 * @param array the heap array
 * @param oldCap    the length of the array
 */
private void tryGrow(Object[] array, int oldCap){
    /**
     * tryGrow 数组容量扩容操作
     * 整个方法的执行是在已经 ReentrantLock 获取锁的情况下进行的
     */

    lock.unlock(); // must release and then re-acquire main lock // 1. 释放全局的锁(为什么呢? 原因也非常简单, 这个 lock 是全局方法共享的, 为的是更好的并发性能, 而扩容操作的并发是通过简单的乐观锁 allocationSpinLock 来进行控制de)
    Object[] newArray = null;
    if(allocationSpinLock == 0 &&                                // 2. 居于CAS操作, 在 allocationSpinLock 实现乐观锁, 这个也是为了在扩容时不影响容器的其他并发操作
            unsafe.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)){
        try{
            int newCap = oldCap + ((oldCap < 64)?                // 3. 容量若小于 64则直接 double + 2; 大于的话, 直接 * 1.5
                    (oldCap + 2): // grow faster if small
                    (oldCap >> 1)
                                    );
            if(newCap - MAX_ARRAY_SIZE > 0){ // possible overflow
                int minCap = oldCap + 1;                         // 4. 扩容后超过最大容量处理
                if(minCap < 0 || minCap > MAX_ARRAY_SIZE){
                    throw new OutOfMemoryError();
                }
                newCap = MAX_ARRAY_SIZE;
            }
            if(newCap > oldCap && queue == array){              // 5. queue == array 若数组没变化, 直接进行新建数组
                newArray = new Object[newCap];
            }
        }finally {
            allocationSpinLock = 0;
        }
    }
                                                                // 6. newArray == null 说明上面的操作过程中, 有其他的线程进行了扩容的操作
    if(newArray == null){ // back off if another thread is allocating
        Thread.yield();                                         // 7. 让出 CPU 调度(因为其他线程扩容后必定有其他的操作)
    }
    lock.lock();                                                // 8. 重新获取锁
    if(newArray != null && queue == array){                     // 9. 判断数组 queue 有没有在其他线程中变化过
        queue = newArray;                                       // 10. 未变化, 直接进行赋值操作
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

在进行offer元素时主要还调用了 siftUpComparable 方法
思路: 将元素与上面的 parent 进行比较, 直到 parent >= 这个元素

    /**
     * Insert item x at position k, maintaining heap invariant by
     * promoting x up the tree until it is greater than or equal to
     * its parent, or is the root
     *
     * To simplify and speed up coercions and comparisons. the
     * Comparable and Comparator versions are separated into different
     * method that are otherwise identical. (Similarly for siftDown)
     * These methods are statics, with heap state as arguments, to
     * simplify use in light og possible comparator exceptions
     *
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param <T>
     */
    private static <T> void siftUpComparable(int k, T x, Object[] array){
        /**
         * 简单的 siftUp 操作: 大体操作就是将元素x放置到k位置, 然后对k的parent进行比较, 直到 k>=parent为止
         */
        Comparable<? super T> key = (Comparable<? super T>)x;
        while(k > 0){                           // 1. k是否到达二叉树的顶端
            int parent = (k - 1) >>> 1;         // 2. 寻找 k 的parent位置
            Object e = array[parent];           // 3. 获取parent的值
            if(key.compareTo((T)e) >= 0){       // 4. key >= e说明 parent >=子节点, 则不需要 siftUp 操作
                break;
            }
            array[k] = e;                       // 5. 将上次比较中 parent节点的值放在子节点上
            k = parent;                         // 6. 将这次比较中的 parent 当作下次比价的k(k是下次比较的子节点)
        }
        array[k] = key;                         // 7. 将值key放置合适的位置上
    }
4. 删除元素 poll 方法

思路: 将元素的首节点拿出, 作为返回, 末尾节点放置到 index=0位置, 开始 siftDown直到 满足 parent >= child

@Override
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue(){
    int n = size - 1;
    if(n < 0){                              // 1. 判断元素是否未空
        return null;                        // 2. 容器中没有元素, 直接返回 null
    }
    else{
        Object[] array = queue;
        E result = (E)array[0];             // 3. 取出数组中的第一个元素, 作为返回值
        E x = (E)array[n];                  // 4. 将数组的最后一个元素取出
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if(cmp == null){                    // 5. 将刚才取出的数组中最后一个元素放到第一个index位置, 进行siftDown操作(就是向下堆化操作)
            siftDownComparable(0, x, array, n);
        }else{
            siftDownUsingComparator(0, x, array, n, cmp);
        }
        size = n;                           // 6. 重新赋值 size值
        return result;                      // 7. 返回取出的值
    }
}
4. 删除元素 remove 方法

思路:

  1. 寻找待删除元素在数组中的位置
  2. 删除待删除的元素, 将数组中的最后一个元素放置到这个位置, 先进行 siftDown 寻找合适放置的位置, 然后再siftUp 寻找合适的放置位置
/**
 * Remove a single instance of the specified element from this queue,
 * if it is present. More formally, removes an element {@code e} such
 * that {@code o.equal(e)}, if this queue contains one or more such
 * elements. Returns {@code true} if and onlu if this queue contained
 * the specified element (or equivalently, if this queue changed as
 * a result of the call)
 *
 * @param o element to removed from this queue, if present
 * @return {@code true} if this queue changed as a result of the call
 */
public boolean remove(Object o){
    /**
     * 删除堆中对应的元素
     */
    final ReentrantLock lock = this.lock;
    lock.lock();
    try{
        int i = indexOf(o);     // 1. 找出元素 o 在堆中的位置
        if(i == -1){
            return false;
        }
        removeAt(i);            // 2. 调用 removeAt 定点删除元素
        return true;
    }finally {
        lock.unlock();
    }
}

/**
 * Removes the ith element from queue
 * @param i
 */
private void removeAt(int i){
    /**
     * 删除堆中指定位置的元素
     */
    Object[] array = queue;
    int n = size - 1;
    if(n == i){ // remove last lement                           // 1. 若元素是末尾元素, 则直接进行删除操作
        array[i] = null;
    }else{
        E moved = (E)array[n];                                  // 2. 获取待堆中最后的值(这个不是最大值)
        array[n] = null;                                        // 3. 将对应元素置空
        Comparator<? super E> cmp = comparator;
        if(cmp == null){
            siftDownComparable(i, moved, array, n);             // 4. 将最后值 moved 放在 i 位置进行 siftDown
        }else{
            siftDownUsingComparator(i, moved, array, n, cmp);
        }
        if(array[i] == moved){                                  // 5. array[i] = moved 说明 siftDown 没起作用, 节点 moved可能应该在堆上面的位置, 所以进行 siftUp, 从而将 moved 放在上面堆中某个位置
            if(cmp == null){
                siftUpComparable(i, moved, array);
            }else{
                siftUpUsingComparator(i, moved, array, cmp);
            }
        }
    }
    size = n;                                                   // 6. 进行size重新赋值操作
}

总结: PriorityBlockingQueue 是一个基于二叉堆实现的优先级队列, 与其他队列不同的是它支持高优先级的数据线poll出, 在有优先级需要的生产者消费者场景中用这个类比较合适.

参考:
二叉堆
堆排序
vickyqi PriorityBlockingQueue

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 229,362评论 6 537
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,013评论 3 423
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 177,346评论 0 382
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,421评论 1 316
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,146评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,534评论 1 325
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,585评论 3 444
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,767评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,318评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,074评论 3 356
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,258评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,828评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,486评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,916评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,156评论 1 290
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,993评论 3 395
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,234评论 2 375

推荐阅读更多精彩内容

  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,726评论 18 399
  • 一、基本数据类型 注释 单行注释:// 区域注释:/* */ 文档注释:/** */ 数值 对于byte类型而言...
    龙猫小爷阅读 4,285评论 0 16
  • PriorityQueue 一个无限的优先级队列基于一个优先级堆。优先级队列中的元素根据它们的Comparable...
    tomas家的小拨浪鼓阅读 2,565评论 1 2
  • 2016.9.16 今天是休闲也是马力十足的一天。 今天一共跟进/邀约/温暖了6个非会员朋友; 4个会员朋友;约到...
    风飘飘_阅读 304评论 0 0
  • 修行小札 陶醉 2016.12.27 记不清楚有多久懒得动手写下如影随形的诸多情绪了,渐渐养成了在心底消化所...
    闲敲棋子Ray阅读 149评论 1 1