ConcurrentHashMap

HashMap 在并发环境下HashMap1.7是线程不安全的,在扩容时可能会造成环状链表
HashTable 是线程安全的,key、value不能为null,HashTable内部大部分方法基本上都是synchronized锁住整个操作,所以会导致整个操作相当慢。

ConcurrentHashMap原理

HashTable

HashTable 数组+链表的实现结构,只不过数组外层是一个大锁,所有线程竞争同一把锁,导致效率低下。


image.png

base1.7

ConcurrentHashMap 1.7 依旧是数组+链表的实现结构,只不过最外层不是一个大数组,而是在每个分段上加Segment数组,锁是锁定某一个Segment,所以称之为分段锁。那么在并发情况下,不同数据段的锁之间就不会出现锁竞争。

Segment类继承ReentrantLock类实现了可重入锁。
HashEntry是个数组,如果发生hash碰撞,会以链表的形式从头部添加(因为HashEntry的next节点为final型),这里的table指的是HashEntry<K,V>[] table。

读写方式:

  • 读操作:获取Key所在的Segment时,需要保证可见性,使用volatile关键字;
  • 写操作:获取该Key-Value对所在的Segment的锁进行加锁。
image.png

base1.8

ConcurrentHashMap1.8采用的数组+链表+红黑树,这里的数组指的是Node数组,同样是实现了Map.Entry接口,当发生哈希冲突的时候采用拉链法(寻址时间复杂度为O(N)),桶上链表长度达到 8 个或者以上,并且数组长度为 64 以下时只会触发扩容而不会将链表转为红黑树 当链过长时(64>链长>=8扩容)链长大于64转为红黑树(寻址时间复杂度为O(long(N)))。

线程安全方面,ConcurrentHashMap采用CAS+synchronized来保证线程安全,且synchronized关键字是作用于具体的对象上。

image.png

ConcurrentHashMap与HashMap对比图(base1.8)

image.png

1.8与1.7中ConcurrentHashMap对比图

image.png

ConcurrentHashMap(jdk1.7)源码解读

HashEntry类

static final class HashEntry<K,V> { 
       final K key;                       // 声明 key 为 final 型
       final int hash;                   // 声明 hash 值为 final 型 
       volatile V value;                 // 声明 value 为 volatile 型
       final HashEntry<K,V> next;      // 声明 next 为 final 型 
 
       HashEntry(K key, int hash, HashEntry<K,V> next, V value) { 
           this.key = key; 
           this.hash = hash; 
           this.next = next; 
           this.value = value; 
       } 
}

HashEntry 用来封装散列映射表中的键值对。在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。

在 ConcurrentHashMap 中,在散列时如果产生“碰撞”,将采用“分离链接法”来处理“碰撞”:把“碰撞”的 HashEntry 对象链接成一个链表。由于 HashEntry 的 next 域为 final 型,所以新节点只能在链表的表头处插入。 下图是在一个空桶中依次插入 A,B,C 三个 HashEntry 对象后的结构图:


image.png

Segment类

static final class Segment<K,V> extends ReentrantLock implements Serializable { 
       /** 
        * 在本 segment 范围内,包含的 HashEntry 元素的个数
        * 该变量被声明为 volatile 型
        */ 
       transient volatile int count; 
 
       /** 
        * table 被更新的次数
        */ 
       transient int modCount; 
 
       /** 
        * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
        */ 
       transient int threshold; 
 
       /** 
        * table 是由 HashEntry 对象组成的数组
        * 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表
        * table 数组的数组成员代表散列映射表的一个桶
        * 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分
        * 如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16 
        */ 
       transient volatile HashEntry<K,V>[] table; 
 
       /** 
        * 装载因子
        */ 
       final float loadFactor; 
 
       Segment(int initialCapacity, float lf) { 
           loadFactor = lf; 
           setTable(HashEntry.<K,V>newArray(initialCapacity)); 
       } 
 
       /** 
        * 设置 table 引用到这个新生成的 HashEntry 数组
        * 只能在持有锁或构造函数中调用本方法
        */ 
       void setTable(HashEntry<K,V>[] newTable) { 
           // 计算临界阀值为新数组的长度与装载因子的乘积
           threshold = (int)(newTable.length * loadFactor); 
           table = newTable; 
       } 
 
       /** 
        * 根据 key 的散列值,找到 table 中对应的那个桶(table 数组的某个数组成员)
        */ 
       HashEntry<K,V> getFirst(int hash) { 
           HashEntry<K,V>[] tab = table; 
           // 把散列值与 table 数组长度减 1 的值相“与”,
// 得到散列值对应的 table 数组的下标
           // 然后返回 table 数组中此下标对应的 HashEntry 元素
           return tab[hash & (tab.length - 1)]; 
       } 
}

Segment类是ConcurrentHashMap的静态内部类,继承ReentrantLock类实现了可重入锁。

其中HashEntry是个数组,如果发生hash碰撞,会以链表的形式从头部添加(因为HashEntry的next节点为final型),这里的table指的是HashEntry<K,V>[] table。


image.png

ConcurrentHashMap 类(jdk1.7)

ConcurrentHashMap concurrentLevel在默认值为16,也就是并发级别会创建包含 16 个 Segment 对象的数组,每个Segment包含一定数据的HashEntry。

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> 
       implements ConcurrentMap<K, V>, Serializable { 
 
   /** 
    * 散列映射表的默认初始容量为 16,即初始默认为 16 个桶
    * 在构造函数中没有指定这个参数时,使用本参数
    */ 
   static final     int DEFAULT_INITIAL_CAPACITY= 16; 
 
   /** 
    * 散列映射表的默认装载因子为 0.75,该值是 table 中包含的 HashEntry 元素的个数与
* table 数组长度的比值
    * 当 table 中包含的 HashEntry 元素的个数超过了 table 数组的长度与装载因子的乘积时,
* 将触发 再散列
    * 在构造函数中没有指定这个参数时,使用本参数
    */ 
   static final float DEFAULT_LOAD_FACTOR= 0.75f; 
 
   /** 
    * 散列表的默认并发级别为 16。该值表示当前更新线程的估计数
    * 在构造函数中没有指定这个参数时,使用本参数
    */ 
   static final int DEFAULT_CONCURRENCY_LEVEL= 16; 
 
   /** 
    * segments 的掩码值
    * key 的散列码的高位用来选择具体的 segment 
    */ 
   final int segmentMask; 
 
   /** 
    * 偏移量
    */ 
   final int segmentShift; 
 
   /** 
    * 由 Segment 对象组成的数组
    */ 
   final Segment<K,V>[] segments; 
 
   /** 
    * 创建一个带有指定初始容量、加载因子和并发级别的新的空映射。
    */ 
   public ConcurrentHashMap(int initialCapacity, 
                            float loadFactor, int concurrencyLevel) { 
       if(!(loadFactor > 0) || initialCapacity < 0 || 
concurrencyLevel <= 0) 
           throw new IllegalArgumentException(); 
 
       if(concurrencyLevel > MAX_SEGMENTS) 
           concurrencyLevel = MAX_SEGMENTS; 
 
       // 寻找最佳匹配参数(不小于给定参数的最接近的 2 次幂) 
       int sshift = 0; 
       int ssize = 1; 
       while(ssize < concurrencyLevel) { 
           ++sshift; 
           ssize <<= 1; 
       } 
       segmentShift = 32 - sshift;       // 偏移量值
       segmentMask = ssize - 1;           // 掩码值 
       this.segments = Segment.newArray(ssize);   // 创建数组
 
       if (initialCapacity > MAXIMUM_CAPACITY) 
           initialCapacity = MAXIMUM_CAPACITY; 
       int c = initialCapacity / ssize; 
       if(c * ssize < initialCapacity) 
           ++c; 
       int cap = 1; 
       while(cap < c) 
           cap <<= 1; 
 
       // 依次遍历每个数组元素
       for(int i = 0; i < this.segments.length; ++i) 
           // 初始化每个数组元素引用的 Segment 对象
this.segments[i] = new Segment<K,V>(cap, loadFactor); 
   } 
 
   /** 
    * 创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) 
 * 的空散列映射表。
    */ 
   public ConcurrentHashMap() { 
       // 使用三个默认参数,调用上面重载的构造函数来创建空散列映射表
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); 
}

ConcurrentHashMap 的结构示意图:


image.png

ConcurrentHashMap的get操作

count的声明

transient volatile int count;

所有的get操作之前会先去判断count值,所有的修改操作也会修改count值,volatile语义会让写操作被后续未加锁的读操作读到,这一特性与HashEntry不变特性,使得ConcurrentHashMap的get操作可以在不加锁的情况下操作,只需要判断vaule值是否为空,为空的话,需要重新获取。

V get(Object key, int hash) { 
           if(count != 0) {       // 首先读 count 变量
               HashEntry<K,V> e = getFirst(hash); 
               while(e != null) { 
                   if(e.hash == hash && key.equals(e.key)) { 
                       V v = e.value; 
                       if(v != null)            
                           return v; 
                       // 如果读到 value 域为 null,说明发生了重排序,加锁后重新读取
                       return readValueUnderLock(e); 
                   } 
                   e = e.next; 
               } 
           } 
           return null; 
       }

ConcurrentHashMap的remove操作

假设我们的链表元素是:A-> B -> C -> D->E 我们要删除 C这个entry,因为HashEntry中next的不可变,所以我们无法直接把C的next指向E,而是将要删除的节点之前的节点复制一份,形成新的链表。

V remove(Object key, int hash, Object value) { 
           lock();         // 加锁
           try{ 
               int c = count - 1; 
               HashEntry<K,V>[] tab = table; 
               // 根据散列码找到 table 的下标值
               int index = hash & (tab.length - 1); 
               // 找到散列码对应的那个桶
               HashEntry<K,V> first = tab[index]; 
               HashEntry<K,V> e = first; 
               while(e != null&& (e.hash != hash || !key.equals(e.key))) 
                   e = e.next; 
 
               V oldValue = null; 
               if(e != null) { 
                   V v = e.value; 
                   if(value == null|| value.equals(v)) { // 找到要删除的节点
                       oldValue = v; 
                       ++modCount; 
                       // 所有处于待删除节点之后的节点原样保留在链表中
                       // 所有处于待删除节点之前的节点被克隆到新链表中
                       HashEntry<K,V> newFirst = e.next;// 待删节点的后继结点
                       for(HashEntry<K,V> p = first; p != e; p = p.next) 
                           newFirst = new HashEntry<K,V>(p.key, p.hash, 
                                                         newFirst, p.value); 
                       // 把桶链接到新的头结点
                       // 新的头结点是原链表中,删除节点之前的那个节点
                       tab[index] = newFirst; 
                       count = c;      // 写 count 变量
                   } 
               } 
               return oldValue; 
           } finally{ 
               unlock();               // 解锁
           } 
       }
image.png

从上图可以看出,删除节点 C 之后的所有节点原样保留到新链表中;删除节点 C 之前的每个节点被克隆到新链表中,注意:它们在新链表中的链接顺序被反转了。

在执行 remove 操作时,原始链表并没有被修改,也就是说:读线程不会受同时执行 remove 操作的并发写线程的干扰。

ConcurrentHashMap的put操作

public V put(K key, V value) { 
       if (value == null)          //ConcurrentHashMap 中不允许用 null 作为映射值
           throw new NullPointerException(); 
       int hash = hash(key.hashCode());        // 计算键对应的散列码
       // 根据散列码找到对应的 Segment 
       return segmentFor(hash).put(key, hash, value, false); 
}

然后,根据 hash 值找到对应的Segment 对象:

  /** 
    * 使用 key 的散列码来得到 segments 数组中对应的 Segment 
    */ 
final Segment<K,V> segmentFor(int hash) { 
   // 将散列值右移 segmentShift 个位,并在高位填充 0 
   // 然后把得到的值与 segmentMask 相“与”
  // 从而得到 hash 值对应的 segments 数组的下标值
  // 最后根据下标值返回散列码对应的 Segment 对象
     return segments[(hash >>> segmentShift) & segmentMask]; 
}

最后,对某个具体的 Segment 执行具体的 put 操作,整个put操作是加锁操作的:

V put(K key, int hash, V value, boolean onlyIfAbsent) { 
           lock();  // 加锁,这里是锁定某个 Segment 对象而非整个 ConcurrentHashMap 
           try { 
               int c = count; 
 
               if (c++ > threshold)     // 如果超过再散列的阈值
                   rehash();              // 执行再散列,table 数组的长度将扩充一倍
 
               HashEntry<K,V>[] tab = table; 
               // 把散列码值与 table 数组的长度减 1 的值相“与”
               // 得到该散列码对应的 table 数组的下标值
               int index = hash & (tab.length - 1); 
               // 找到散列码对应的具体的那个桶
               HashEntry<K,V> first = tab[index]; 
 
               HashEntry<K,V> e = first; 
               while (e != null && (e.hash != hash || !key.equals(e.key))) 
                   e = e.next; 
 
               V oldValue; 
               if (e != null) {            // 如果键 / 值对以经存在
                   oldValue = e.value; 
                   if (!onlyIfAbsent) 
                       e.value = value;    // 设置 value 值
               } 
               else {                        // 键 / 值对不存在 
                   oldValue = null; 
                   ++modCount;         // 要添加新节点到链表中,所以 modCont 要加 1  
                   // 创建新节点,并添加到链表的头部 
                   tab[index] = new HashEntry<K,V>(key, hash, first, value); 
                   count = c;               // 写 count 变量
               } 
               return oldValue; 
           } finally { 
               unlock();                     // 解锁
           } 
       }

上述的加锁操作是针对某个Segment来进行的,默认ConcurrentHashMap包含16个Segment,也就是其他15个Segment还是可以正常进行读写操作。

小结

在使用锁来协调多线程间并发访问的模式下,减小对锁的竞争可以有效提高并发性。有两种方式可以减小对锁的竞争:

  1. 减小请求同一个锁的频率(下面的第1条)
  2. 减少持有锁的时间。(下面的第2,3条)

ConcurrentHashMap 的高并发性主要来自于三个方面:

  1. 用分离锁实现多个线程间的更深层次的共享访问。
  2. 用 HashEntery 对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求。
  3. 通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性。

ConcurrentHashMap(jdk1.8)源码解读

  1. Node节点
/* ---------------- Nodes -------------- */

    /**
     * Key-value entry.  This class is never exported out as a
     * user-mutable Map.Entry (i.e., one supporting setValue; see
     * MapEntry below), but can be used for read-only traversals used
     * in bulk tasks.  Subclasses of Node with a negative hash field
     * are special, and contain null keys and values (but are never
     * exported).  Otherwise, keys and vals are never null.
     */
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }
  • 使用 Node 代替 HashEntry 的原因:便于与TreeNode节点替换
  • 如果锁住整张Node表:Node为单向链表,只需要锁住头节点即可
  1. Table(实际上是Node数组)


    image.png
 /**
     * The array of bins. Lazily initialized upon first insertion.
     * Size is always a power of two. Accessed directly by iterators.
     */
    transient volatile Node<K,V>[] table;

    /**
     * The next table to use; non-null only while resizing.
     */
    private transient volatile Node<K,V>[] nextTable;
  • table数组在首次 put 操作时初始化。
  • nextTable数组只在扩容时才被使用到。
  • nextTable数组是table数组容量的两倍。
  1. TreeNode节点


    image.png
/**
     * Nodes for use in TreeBins
     */
    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }

        Node<K,V> find(int h, Object k) {
            return findTreeNode(h, k, null);
        }

        /**
         * Returns the TreeNode (or null if not found) for the given key
         * starting at given root.
         */
        final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
            if (k != null) {
                TreeNode<K,V> p = this;
                do  {
                    int ph, dir; K pk; TreeNode<K,V> q;
                    TreeNode<K,V> pl = p.left, pr = p.right;
                    if ((ph = p.hash) > h)
                        p = pl;
                    else if (ph < h)
                        p = pr;
                    else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                        return p;
                    else if (pl == null)
                        p = pr;
                    else if (pr == null)
                        p = pl;
                    else if ((kc != null ||
                              (kc = comparableClassFor(k)) != null) &&
                             (dir = compareComparables(kc, k, pk)) != 0)
                        p = (dir < 0) ? pl : pr;
                    else if ((q = pr.findTreeNode(h, k, kc)) != null)
                        return q;
                    else
                        p = pl;
                } while (p != null);
            }
            return null;
        }
    }
  • 继承Node节点,又可以与Node节点互换
  • 由于 TreeNode 新增了 prev 指针,又因为继承的 Node 本身是个单向链表,同时拥有 next 和 prev 指针,所以 TreeNode 是一个双向链表。
  • 由于 TreeNode 拥有了 prev 指针,ConcurrentHashMap 使用红黑树的方式删除某一个 TreeNode 时更加方便,这是因为通常我们使用单向链表的方式删除一个节点时,必须从头节点往后遍历,遍历到要删除的节点才可以删除,而使用红黑树的方式删除某个节点时并不按单向链表的方式遍历,而可能是从链表的中间删除某个节点,这个时候直接可以拿到上个链表的指针就可以从任意地方删除节点。
  • TreeNode 作为红黑树节点,自然就可以以红黑树的方式遍历 TreeNode 节点,但我们上面也提到,TreeNode 同时还拥有着一个双向链表的性质,这也就意味着我们在遍历红黑树节点时不仅可以以红黑树的方式去遍历他,还能以链表的方式去遍历他,为什么要这么做呢,下面 TreeBin 部分的内容将会对这个疑问进行解答。
  1. treeBin节点


    image.png
  • TreeBin 是操作 TreeNode 的入口(代理节点) 。这么说的原因是因为 TreeBin 包含有红红黑树的 root 节点以及双向链表的 first 节点,所有对双向链表和红黑树的操作都必须要从 first / root 节点开始,操作时锁住 first / root 节点即可锁住 双向链表 / 红黑树,所以说 TreeBin 是操作 TreeNode 的代理节点 。
  • TreeBin 内部锁 。 TreeBin 继承于 Node 节点,同样也可以使用相同的方式,也就是使用 synchronized 对节点自身进行加锁,由上面的结构可以看出,TreeBin 还额外定义了一个内部锁,用于对红黑树的读写操作进行更细致的控制,以达到性能的提升 (后面再详解) 。

内部锁大致工作方式如下:
<1> 在 put / remove / replace 操作之前,和对 Node 链表的操作一样,均会先锁住 TreeBin 节点;
<2> 然后根据实际情况设置 TreeBin 里面维护的 读 / 写 锁的状态;
<3> get 操作时先判断 TreeBin 自己维护的锁状态,根据锁状态选择用链表还是红黑树的方式遍历节点;

5种构造函数

第一种无参构造函数,内部什么也不做,在第一次插入数据时,默认容量大小为16

 /**
     * Creates a new, empty map with the default initial table size (16).
     */
    public ConcurrentHashMap() {
    }

第二种传入容量,如果传入值大于最大容量,取最大容量;否则(传入容量 + (传入容量无符号右移一位) + 1)的结果取最近的2次幂【举例:传入容量12,(12+12>>>1)+1,取最近的2的幂次方就是32】,取2的幂次方的目的是为了位运算和取模运算的结果一样。

/**
     * Creates a new, empty map with an initial table size
     * accommodating the specified number of elements without the need
     * to dynamically resize.
     *
     * @param initialCapacity The implementation performs internal
     * sizing to accommodate this many elements.
     * @throws IllegalArgumentException if the initial capacity of
     * elements is negative
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

/**
     * Returns a power of two table size for the given desired capacity.
     * See Hackers Delight, sec 3.2
     */
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

tableSizeFor方法入参:

  • JDK8 早期版本用的是:tableSizeFor(initialCapacity),
  • JDK1.8.0_162上面的入参版本用的是:tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1))
    这里面肯定是一个优化,既然猜不出来那我们就实际运行起来看看结果对比:
    image.png

    当 initialCapacity = 1 时,[initialCapacity + (initialCapacity >>> 1) + 1] 入参计算出来的容量为 1,而使用后结果为 2。我们都知道一个数组长度 n 为 1 的 Map 集合,只要向其存入一个值就会扩容,而数组长度 n 为 2 的 Map 集合就不会 。
    结论: 该修改避免了频繁的扩容。

tableSizeFor方法主要用于创建 ConcurrentHashMap 时将任意输入的初始值转换为大于或等于该初始值的 2 ^ n 的数。

  • c-1 考虑到c本身就是2^n,如果不先进行减一再继续操作,最后出来的结果将是接近于2^(n+1)
  • n+1的目的是将最终结果变为2^n


    image.png

第四种初始化容量以及负载系数,实际上调用的是第五种构造函数,传入默认的并发数1。

 /**
     * Creates a new, empty map with an initial table size based on
     * the given number of elements ({@code initialCapacity}) and
     * initial table density ({@code loadFactor}).
     *
     * @param initialCapacity the initial capacity. The implementation
     * performs internal sizing to accommodate this many elements,
     * given the specified load factor.
     * @param loadFactor the load factor (table density) for
     * establishing the initial table size
     * @throws IllegalArgumentException if the initial capacity of
     * elements is negative or the load factor is nonpositive
     *
     * @since 1.6
     */
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

第五种与第四种实际调用的是相同的方法,只是多了一个concurrencyLevel代表预估的并发量

  • 如果传入容量小于并发数,则初始容量值取并发数,这样做的目的是每一个线程能分到至少一个Node
  • 如果size=(long)(1.0 + (long)initialCapacity / loadFactor)大于最大容量,取最大容量;否则取将近size的2的幂次方
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;

sizeCtl 用于记录初始容量大小,仅用于记录集合在实际创建时应该使用的大小的作用 。

/**
     * Creates a new, empty map with an initial table size based on
     * the given number of elements ({@code initialCapacity}), table
     * density ({@code loadFactor}), and number of concurrently
     * updating threads ({@code concurrencyLevel}).
     *
     * @param initialCapacity the initial capacity. The implementation
     * performs internal sizing to accommodate this many elements,
     * given the specified load factor.
     * @param loadFactor the load factor (table density) for
     * establishing the initial table size
     * @param concurrencyLevel the estimated number of concurrently
     * updating threads. The implementation may use this value as
     * a sizing hint.
     * @throws IllegalArgumentException if the initial capacity is
     * negative or the load factor or concurrencyLevel are
     * nonpositive
     */
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

put方法

对于put操作,如果key对应的数组元素为null,则通过CAS设置为当前值。如果key对应的数组元素不为空,则通过Synchronized关键字申请锁,进行加锁操作。

  1. 判断key,value是否为空,空的话抛出空指针异常。
  2. 调用spread()方法计算key的hashCode()获得哈希地址。
  3. 如果table为空,就去初始化table,需要注意的是这里并没有加synchronized,这是允许多个线程去尝试初始化table,但在初始化函数中使用了CAS来保证只有一个线程能成功。
  4. 使用 容量大小-1 & 哈希地址(相当于%) 计算出待插入键值的下标,如果该下标上的bucket为null,则直接调用实现CAS原子性操作的casTabAt()方法将节点插入到table中,如果插入成功则完成put操作,结束返回。插入失败(被别的线程抢先插入了)则继续往下执行。
  5. 如果该下标上的节点(头节点)的哈希地址为-1,代表需要扩容,该线程执行helpTransfer()方法协助扩容。
  6. 如果4,5都不满足,也就是该下标上的桶不为空,也不需要扩容,则进入此桶,synchronized锁住该下标上的bucket,其他不上锁
  7. 如果是链表,则遍历链表看看是否有哈希地址和键key相同的节点,有的话则根据传入的参数进行覆盖或者不覆盖,没有找到相同的节点的话则将新增的节点插入到链表尾部。如果是红黑树,则将节点插入。到这里结束加锁。
  8. 最后判断该bucket上的链表长度是否大于链表转红黑树的阈值(8),大于则调用treeifyBin()方法将链表转成红黑树,以免链表过长影响效率。
  9. 调用addCount()方法,作用是将ConcurrentHashMap的键值对数量+1,还有另一个作用是检查ConcurrentHashMap是否需要扩容。
/**
     * Maps the specified key to the specified value in this table.
     * Neither the key nor the value can be null.
     *
     * <p>The value can be retrieved by calling the {@code get} method
     * with a key that is equal to the original key.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with {@code key}, or
     *         {@code null} if there was no mapping for {@code key}
     * @throws NullPointerException if the specified key or value is null
     */
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    //不允许键值为null,这点与线程安全的Hashtable保持一致,和HashMap不同。
    if (key == null || value == null) throw new NullPointerException();
    //取键key的hashCode()和HashMap、Hashtable都一样,然后再执行spread()方法计算得到哈希地
    //址,这个spread()方法和HashMap的hash()方法一样,都是将hashCode()做无符号右移16位,只不
    //过spread()加多了 &0x7fffffff,让结果为正数。
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果table数组为空或者长度为0(未初始化),则调用initTable()初始化table,初始化函数
        //下面介绍。
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //调用实现了CAS原子性操作的tabAt方法
        //tabAt方法的第一个参数是Node数组的引用,第二个参数在Node数组的下标,实现的是在Nod
        //e数组中查找指定下标的Node,如果找到则返回该Node节点(链表头节点),否则返回null,
        //这里的i = (n - 1)&hash即是计算待插入的节点在table的下标,即table容量-1的结果和哈
        //希地址做与运算,和HashMap的算法一样。
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果该下标上并没有节点(即链表为空),则直接调用实现了CAS原子性操作的
            //casTable()方法,
            //casTable()方法的第一个参数是Node数组的引用,第二个参数是待操作的下标,第三
            //个参数是期望值,第四个参数是待操作的Node节点,实现的是将Node数组下标为参数二
            //的节点替换成参数四的节点,如果期望值和实际值不符返回false,否则参数四的节点成
            //功替换上去,返回ture,即插入成功。注意这里:如果插入成功了则跳出for循环,插入
            //失败的话(其他线程抢先插入了),那么会执行到下面的代码。
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //如果该下标上的节点的哈希地址为-1(即链表的头节点为ForwardingNode节点),则表示
        //table需要扩容,值得注意的是ConcurrentHashMap初始化和扩容不是用同一个方法,而
        //HashMap和Hashtable都是用同一个方法,当前线程会去协助扩容,扩容过程后面介绍。
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        //如果该下标上的节点既不是空也不是需要扩容,则表示这个链表可以插入值,将进入到链表
        //中,将新节点插入或者覆盖旧值。
        else {
            V oldVal = null;
            //通过关键字synchroized对该下标上的节点加锁(相当于锁住锁住
            //该下标上的链表),其他下标上的节点并没有加锁,所以其他线程
            //可以安全的获得其他下标上的链表进行操作,也正是因为这个所
            //以提高了ConcurrentHashMap的效率,提高了并发度。
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //如果该下标上的节点的哈希地址大于等于0,则表示这是
                    //个链表。
                    if (fh >= 0) {
                        binCount = 1;
                        //遍历链表。
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //如果哈希地址、键key相同 或者 键key不为空
                            //且键key相同,则表示存在键key和待插入的键
                            //key相同,则执行更新值value的操作。
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //如果找到了链表的最后一个节点都没有找到相
                            //同键Key的,则是插入操作,将插入的键值新建
                            //个节点并且添加到链表尾部,这个和HashMap一
                            //样都是插入到尾部。
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //如果该下标上的节点的哈希地址小于0 且为树节点
                    //则将带插入键值新增到红黑树
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //如果插入的结果不为null,则表示为替换
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash,
                        key,value)) != null){
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            //判断链表的长度是否大于等于链表的阈值(8),大于则将链表转成
            //红黑树,treeifyBin实际上大于64才会转化为红黑树,否则只是扩容,提高效率。这点和HashMap一样。
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

spread 方法

spread 方法又称扰动函数,目的在于将 hash 值的 高16位 和 低16位 的特征进行混合,从而尽可能得到一个独特的 hash 值,以减少 hash 冲突。

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

sizeCtl

多个线程的共享变量,是操作的控制标识符,它的作用不仅包括threshold的作用,在不同的地方有不同的值也有不同的用途。

  • -1代表正在初始化,将 sizeCtl 值设置为 -1 表示集合正在初始化中,其他线程发现该值为 -1 时会让出CPU资源以便初始化操作尽快完成
  • -N代表有N-1个线程正在进行扩容操作
  • 0代表hash表还没有被初始化
  • 正数表示下一次进行扩容的容量大小
U.compareAndSwapInt(this, SIZECTL, sc, -1)

sizeCtl 用于记录当前集合的负载容量值,也就是触发集合扩容的极限值 。

Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//sc设置为0.75 * n,类似HashMap中的loadFactor*Cap
sc = n - (n >>> 2);
sizeCtl = sc;

initTable方法

初始化table(Node数组Node<K,V>[]),如果声明了table的容量,在初始化时会根据参数调整table的大小,确保table的容量是2的幂次方;如果未声明,默认大小为16。

通过源码可以看得出,table的初始化只能由一个线程完成,但是每个线程都可以争抢去初始化table。

  1. 判断table是否为null,即需不需要首次初始化,如果某个线程进到这个方法后,其他线程已经将table初始化好了,那么该线程结束该方法返回。
  2. 如果table为null,进入到while循环,如果sizeCtl小于0(其他线程正在对table初始化),那么该线程调用Thread.yield()挂起该线程,让出CPU时间,该线程也从运行态转成就绪态,等该线程从就绪态转成运行态的时候,别的线程已经table初始化好了,那么该线程结束while循环,结束初始化方法返回。如果从就绪态转成运行态后,table仍然为null,则继续while循环。
  3. 如果table为null且sizeCtl不小于0,则调用实现CAS原子性操作的compareAndSwap()方法将sizeCtl设置成-1,告诉别的线程我正在初始化table,这样别的线程无法对table进行初始化。如果设置成功,则再次判断table是否为空,不为空则初始化table,容量大小为默认的容量大小(16),或者为sizeCtl。其中sizeCtl的初始化是在构造函数中进行的,sizeCtl = ((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方)
/**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

Unsafe一些方法

原子的获取table的i位置的元素

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

CAS机制替换i位置的元素

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

addCount扩容方法

put()方法插入成功后,需要调用 addCount(1L, binCount),这个方法目的很明确,就是将整个table的元素加1,但是实现起来比较麻烦。

考虑在多线程环境下,我们要实现一个统计元素的个数,把size设置为volatile,保证可见性,然后用CAS自增。但是如果并发比较大的情况下,会导致size的竞争特别严重。基于此,这里做了优化,把size分到各自的对象中,最后再加起来。

size在此用baseCount表示,分散到对象用CounterCell 表示,对象里的计数用value表示,这里的变量都用volatile修饰。

当需要修改元素数量时,线程会先去 CAS 修改 baseCount 加1,若成功即返回。若失败,则线程被分配到某个 CounterCell ,然后操作 value 加1。若成功,则返回。否则,给当前线程重新分配一个 CounterCell,再尝试给 value 加1。(这里简略的说,实际更复杂)

CounterCell 会组成一个数组,也会涉及到扩容问题。所以,先画一个示意图帮助理解一下。


image.png

线程被分配到CounterCell

/**
     * A padded cell for distributing counts.  Adapted from LongAdder
     * and Striped64.  See their internal docs for explanation.
     */
    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

用来存储当前线程与随机数的关系

/**
     * Returns the probe value for the current thread without forcing
     * initialization. Note that invoking ThreadLocalRandom.current()
     * can be used to force initialization on zero return.
     */
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

addCount方法:

/**
     * Adds to count, and if table is too small and not already
     * resizing, initiates transfer. If already resizing, helps
     * perform transfer if work is available.  Rechecks occupancy
     * after a transfer to see if another resize is already needed
     * because resizings are lagging additions.
     *
     * @param x the count to add
     * @param check if <0, don't check resize, if <= 1 only check if uncontended
     */
    //x为1,check为链上的元素个数
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //两种情况会进入:1. 数组不为空,2. 数组为空,说明竞争线程很少,cas设置baseCount值;设置成功跳到check>=0,进行操作;设置失败,需要为当前线程分配一个格子(指的是CounterCell对象) 
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            //字面意思,无竞争,设置为true
            boolean uncontended = true;
            //这里有三种情况会走fullAddCount,1. 数组为空,2. ThreadLocalRandom.getProbe()为当前线程生成一个随机数(可以理解为hash值)与数组长度取模,当前分配线程所在格子为空,3. 若数组不为空,当前线程所在格子也不为空,则CAS尝试修改此格子对于的值,若设置CELLVALUE值失败,则把 uncontended 值设为 fasle,说明产生了竞争
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //这个方法的目的是让当前线程一定把 1 加成功
                fullAddCount(x, uncontended);
                return;
            }
            //能走到这,说明数组不为空,且修改 baseCount失败,且线程被分配到的格子不为空,且修改 value 成功。
            if (check <= 1)
                return;
            //计算总共的元素个数
            s = sumCount();
        }
        //这里用于检查是否需要扩容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //若元素个数达到扩容阈值,且tab不为空,且tab数组长度小于最大容量
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                //若sc小于0,说明正在扩容
                if (sc < 0) {
                //sc的结构类似这样,1000 0000 0001 1011 0000 0000 0000 0001
                //sc的高16位是数据校验标识,低16位代表当前有几个线程正在帮助扩容,RESIZE_STAMP_SHIFT=16
                //因此判断校验标识是否相等,不相等则退出循环
                //sc == rs + 1,sc == rs + MAX_RESIZERS 这两个应该是用来判断扩容是否已经完成
                //nextTable=null 说明需要扩容的新数组还未创建完成
                //transferIndex这个参数小于等于0,说明已经不需要其它线程帮助扩容了,但是并不说明已经扩容完成,因为有可能还有线程正在迁移元素。
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //到这里说明当前线程可以帮助扩容,因此sc值加一,代表扩容的线程数加1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
    //当sc大于0,说明sc代表扩容阈值,因此第一次扩容之前肯定走这个分支,用于初始化新表 nextTable
   //rs<<16
   //1000 0000 0001 1011 0000 0000 0000 0000
   //+2
   //1000 0000 0001 1011 0000 0000 0000 0010
   //这个值,转为十进制就是 -2145714174,用于标识,这是扩容时,初始化新表的状态,
   //扩容时,需要用到这个参数校验是否所有线程都全部帮助扩容完成。
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    //扩容,第二个参数代表新表,传入null,则说明是第一次初始化新表(nextTable)
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

计算表中的元素总个数

final long sumCount() {
 CounterCell[] as = counterCells; CounterCell a;
 //baseCount,以这个值作为累加基准
 long sum = baseCount;
 if (as != null) {
  //遍历 counterCells 数组,得到每个对象中的value值
  for (int i = 0; i < as.length; ++i) {
   if ((a = as[i]) != null)
    //累加 value 值
    sum += a.value;
  }
 }
 //此时得到的就是元素总个数
 return sum;
} 

扩容时的校验标识

static final int resizeStamp(int n) {
 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

//Integer.numberOfLeadingZeros方法的作用是返回 n 的最高位为1的前面的0的个数
//n=16,
0000 0000 0000 0000 0000 0000 0001 0000
//前面有27个0,即27
0000 0000 0000 0000 0000 0000 0001 1011
//RESIZE_STAMP_BITS为16,然后 1<<(16-1),即 1<<15
0000 0000 0000 0000 1000 0000 0000 0000
//它们做或运算,得到 rs 的值
0000 0000 0000 0000 1000 0000 0001 1011

fullAddCount()方法

该方法在前面+1没有成功的情况,这里是保证+1操作一定会成功

 //传过来的参数分别为 1 , false
private final void fullAddCount(long x, boolean wasUncontended) {
 int h;
 //如果当前线程的随机数为0,则强制初始化一个值
 if ((h = ThreadLocalRandom.getProbe()) == 0) {
  ThreadLocalRandom.localInit();      // force initialization
  h = ThreadLocalRandom.getProbe();
  //此时把 wasUncontended 设为true,认为无竞争
  wasUncontended = true;
 }
 //用来表示比 contend(竞争)更严重的碰撞,若为true,表示可能需要扩容,以减少碰撞冲突
 boolean collide = false;                // True if last slot nonempty
 //循环内,外层if判断分三种情况,内层判断又分为六种情况
 for (;;) {
  CounterCell[] as; CounterCell a; int n; long v;
  //1. 若counterCells数组不为空。  建议先看下边的2和3两种情况,再回头看这个。 
  if ((as = counterCells) != null && (n = as.length) > 0) {
   // (1) 若当前线程所在的格子(CounterCell对象)为空
   if ((a = as[(n - 1) & h]) == null) {
    if (cellsBusy == 0) {    
     //若无锁,则乐观的创建一个 CounterCell 对象。
     CounterCell r = new CounterCell(x); 
     //尝试加锁
     if (cellsBusy == 0 &&
      U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
      boolean created = false;
      //加锁成功后,再 recheck 一下数组是否不为空,且当前格子为空
      try {               
       CounterCell[] rs; int m, j;
       if ((rs = counterCells) != null &&
        (m = rs.length) > 0 &&
        rs[j = (m - 1) & h] == null) {
        //把新创建的对象赋值给当前格子
        rs[j] = r;
        created = true;
       }
      } finally {
       //手动释放锁
       cellsBusy = 0;
      }
      //若当前格子创建成功,且上边的赋值成功,则说明加1成功,退出循环
      if (created)
       break;
      //否则,继续下次循环
      continue;           // Slot is now non-empty
     }
    }
    //若cellsBusy=1,说明有其它线程抢锁成功。或者若抢锁的 CAS 操作失败,都会走到这里,
    //则当前线程需跳转到(9)重新生成随机数,进行下次循环判断。
    collide = false;
   }
   /**
   *后边这几种情况,都是数组和当前随机到的格子都不为空的情况。
   *且注意每种情况,若执行成功,且不break,continue,则都会执行(9),重新生成随机数,进入下次循环判断
   */
   // (2) 到这,说明当前方法在被调用之前已经 CAS 失败过一次,若不明白可回头看下 addCount 方法,
   //为了减少竞争,则跳转到⑨处重新生成随机数,并把 wasUncontended 设置为true ,认为下一次不会产生竞争
   else if (!wasUncontended)       // CAS already known to fail
    wasUncontended = true;      // Continue after rehash
   // (3) 若 wasUncontended 为 true 无竞争,则尝试一次 CAS。若成功,则结束循环,若失败则判断后边的 (4)(5)(6)。
   else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
    break;
   // (4) 结合 (6) 一起看,(4)(5)(6)都是 wasUncontended=true,且CAS修改value失败的情况。
   //若数组有变化,或者数组长度大于等于当前CPU的核心数,则把 collide 改为 false
   //因为数组若有变化,说明是由扩容引起的;长度超限,则说明已经无法扩容,只能认为无碰撞。
   //这里很有意思,认真思考一下,当扩容超限后,则会达到一个平衡,即 (4)(5) 反复执行,直到 (3) 中CAS成功,跳出循环。
   else if (counterCells != as || n >= NCPU)
    collide = false;            // At max size or stale
   // (5) 若数组无变化,且数组长度小于CPU核心数时,且 collide 为 false,就把它改为 true,说明下次循环可能需要扩容
   else if (!collide)
    collide = true;
   // (6) 若数组无变化,且数组长度小于CPU核心数时,且 collide 为 true,说明冲突比较严重,需要扩容了。
   else if (cellsBusy == 0 &&
      U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    try {
     //recheck
     if (counterCells == as) {// Expand table unless stale
      //创建一个容量为原来两倍的数组
      CounterCell[] rs = new CounterCell[n << 1];
      //转移旧数组的值
      for (int i = 0; i < n; ++i)
       rs[i] = as[i];
      //更新数组
      counterCells = rs;
     }
    } finally {
     cellsBusy = 0;
    }
    //认为扩容后,下次不会产生冲突了,和(4)处逻辑照应
    collide = false;
    //当次扩容后,就不需要重新生成随机数了
    continue;                   // Retry with expanded table
   }
   // (9),重新生成一个随机数,进行下一次循环判断
   h = ThreadLocalRandom.advanceProbe(h);
  }
  //2.这里的 cellsBusy 参数非常有意思,是一个volatile的 int值,用来表示自旋锁的标志,
  //可以类比 AQS 中的 state 参数,用来控制锁之间的竞争,并且是独占模式。简化版的AQS。
  //cellsBusy 若为0,说明无锁,线程都可以抢锁,若为1,表示已经有线程拿到了锁,则其它线程不能抢锁。
  else if (cellsBusy == 0 && counterCells == as &&
     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
   boolean init = false;
   try {    
    //这里再重新检测下 counterCells 数组引用是否有变化
    if (counterCells == as) {
     //初始化一个长度为 2 的数组
     CounterCell[] rs = new CounterCell[2];
     //根据当前线程的随机数值,计算下标,只有两个结果 0 或 1,并初始化对象
     rs[h & 1] = new CounterCell(x);
     //更新数组引用
     counterCells = rs;
     //初始化成功的标志
     init = true;
    }
   } finally {
    //别忘了,需要手动解锁。
    cellsBusy = 0;
   }
   //若初始化成功,则说明当前加1的操作也已经完成了,则退出整个循环。
   if (init)
    break;
  }
  //3.到这,说明数组为空,且 2 抢锁失败,则尝试直接去修改 baseCount 的值,
  //若成功,也说明加1操作成功,则退出循环。
  else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
   break;                          // Fall back on using base
 }
}

transfer()方法

虽然我们一直在说帮助扩容,其实更准确的说应该是帮助迁移元素。因为扩容的第一次初始化新表(扩容后的新表)这个动作,只能由一个线程完成。其他线程都是在帮助迁移元素到新数组。
分为两个步骤完成:

  1. 构建一个nextTable,其大小为原来大小的两倍,这个步骤是在单线程环境下完成的
  2. 将原来table里面的内容复制到nextTable中,这个步骤是允许多线程操作的,所以性能得到提升,减少了扩容的时间消耗。
image.png

为了方便,上边以原数组长度 8 为例。在元素迁移的时候,所有线程都遵循从后向前推进的规则,即如图A线程是第一个进来的线程,会从下标为7的位置,开始迁移数据。

而且当前线程迁移时会确定一个范围,限定它此次迁移的数据范围,如图 A 线程只能迁移 bound=6到 i=7 这两个数据。

此时,其它线程就不能迁移这部分数据了,只能继续向前推进,寻找其它可以迁移的数据范围,且每次推进的步长为固定值 stride(此处假设为2)。如图中 B线程发现 A 线程正在迁移6,7的数据,因此只能向前寻找,然后迁移 bound=4 到 i=5 的这两个数据。

当每个线程迁移完成它的范围内数据时,都会继续向前推进。那什么时候是个头呢?

这就需要维护一个全局的变量 transferIndex,来表示所有线程总共推进到的元素下标位置。如图,线程 A 第一次迁移成功后又向前推进,然后迁移2,3 的数据。此时,若没有其他线程在帮助迁移,则 transferIndex 即为2。

剩余部分等待下一个线程来迁移,或者有任何的 A 和B线程已经迁移完成,也可以推进到这里帮助迁移。直到 transferIndex=0 。

//协助扩容方法
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
 Node<K,V>[] nextTab; int sc;
 //头结点为 ForwardingNode ,并且新数组已经初始化
 if (tab != null && (f instanceof ForwardingNode) &&
  (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  int rs = resizeStamp(tab.length);
  while (nextTab == nextTable && table == tab &&
      (sc = sizeCtl) < 0) {
   //若校验标识失败,或者已经扩容完成,或推进下标到头,则退出
   if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    sc == rs + MAX_RESIZERS || transferIndex <= 0)
    break;
   //当前线程需要帮助迁移,sc值加1
   if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
    transfer(tab, nextTab);
    break;
   }
  }
  return nextTab;
 }
 return table;
}
//这个类是一个标志,用来代表当前桶(数组中的某个下标位置)的元素已经全部迁移完成
static final class ForwardingNode<K,V> extends Node<K,V> {
 final Node<K,V>[] nextTable;
 ForwardingNode(Node<K,V>[] tab) {
  //把当前桶的头结点的 hash 值设置为 -1,表明已经迁移完成,
  //这个节点中并不存储有效的数据
  super(MOVED, null, null, null);
  this.nextTable = tab;
 }
}

//扩容方法(迁移数据)
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
 int n = tab.length, stride;
 //根据当前CPU核心数,确定每次推进的步长,最小值为16.(为了方便我们以2为例)
 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  stride = MIN_TRANSFER_STRIDE; // subdivide range
 //从 addCount 方法,只会有一个线程跳转到这里,初始化新数组
 if (nextTab == null) {            // initiating
  try {
   @SuppressWarnings("unchecked")
   //新数组长度为原数组的两倍
   Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
   nextTab = nt;
  } catch (Throwable ex) {      // try to cope with OOME
   sizeCtl = Integer.MAX_VALUE;
   return;
  }
  //用 nextTable 指代新数组
  nextTable = nextTab;
  //这里就把推进的下标值初始化为原数组长度(以16为例)
  transferIndex = n;
 }
 //新数组长度
 int nextn = nextTab.length;
 //创建一个标志类
 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 //是否向前推进的标志
 boolean advance = true;
 //是否所有线程都全部迁移完成的标志
 boolean finishing = false; // to ensure sweep before committing nextTab
 //i 代表当前线程正在迁移的桶的下标,bound代表它本次可以迁移的范围下限
 for (int i = 0, bound = 0;;) {
  Node<K,V> f; int fh;
  //需要向前推进
  while (advance) {
   int nextIndex, nextBound;
   // (1) 先看 (3) 。i每次自减 1,直到 bound。若超过bound范围,或者finishing标志为true,则不用向前推进。
   //若未全部完成迁移,且 i 并未走到 bound,则跳转到 (7),处理当前桶的元素迁移。
   if (--i >= bound || finishing)
    advance = false;
   // (2) 每次执行,都会把 transferIndex 最新的值同步给 nextIndex
   //若 transferIndex小于等于0,则说明原数组中的每个桶位置,都有线程在处理迁移了,
   //于是,需要跳出while循环,并把 i设为 -1,以跳转到④判断在处理的线程是否已经全部完成。
   else if ((nextIndex = transferIndex) <= 0) {
    i = -1;
    advance = false;
   }
   // (3) 第一个线程会先走到这里,确定它的数据迁移范围。(2)处会更新 nextIndex为 transferIndex 的最新值
   //因此第一次 nextIndex=n=16,nextBound代表当次迁移的数据范围下限,减去步长即可,
   //所以,第一次时,nextIndex=16,nextBound=16-2=14。后续,每次都会间隔一个步长。
   else if (U.compareAndSwapInt
      (this, TRANSFERINDEX, nextIndex,
       nextBound = (nextIndex > stride ?
           nextIndex - stride : 0))) {
    //bound代表当次数据迁移下限
    bound = nextBound;
    //第一次的i为15,因为长度16的数组,最后一个元素的下标为15
    i = nextIndex - 1;
    //表明不需要向前推进,只有当把当前范围内的数据全部迁移完成后,才可以向前推进
    advance = false;
   }
  }
  // (4)
  if (i < 0 || i >= n || i + n >= nextn) {
   int sc;
   //若全部线程迁移完成
   if (finishing) {
    nextTable = null;
    //更新table为新表
    table = nextTab;
    //扩容阈值改为原来数组长度的 3/2 ,即新长度的 3/4,也就是新数组长度的0.75倍
    sizeCtl = (n << 1) - (n >>> 1);
    return;
   }
   //到这,说明当前线程已经完成了自己的所有迁移(无论参与了几次迁移),
   //则把 sc 减1,表明参与扩容的线程数减少 1。
   if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
    //在 addCount 方法最后,我们强调,迁移开始时,会设置 sc=(rs << RESIZE_STAMP_SHIFT) + 2
    //每当有一个线程参与迁移,sc 就会加 1,每当有一个线程完成迁移,sc 就会减 1。
    //因此,这里就是去校验当前 sc 是否和初始值是否相等。相等,则说明全部线程迁移完成。
    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
     return;
    //只有此处,才会把finishing 设置为true。
    finishing = advance = true;
    //这里非常有意思,会把 i 从 -1 修改为16,
    //目的就是,让 i 再从后向前扫描一遍数组,检查是否所有的桶都已被迁移完成,参看 (6)
    i = n; // recheck before commit
   }
  }
  // (5) 若i的位置元素为空,则说明当前桶的元素已经被迁移完成,就把头结点设置为fwd标志。
  else if ((f = tabAt(tab, i)) == null)
   advance = casTabAt(tab, i, null, fwd);
  // (6) 若当前桶的头结点是 ForwardingNode ,说明迁移完成,则向前推进 
  else if ((fh = f.hash) == MOVED)
   advance = true; // already processed
  //(7) 处理当前桶的数据迁移。
  else {
   synchronized (f) {  //给头结点加锁
    if (tabAt(tab, i) == f) {
     Node<K,V> ln, hn;
     //若hash值大于等于0,则说明是普通链表节点
     if (fh >= 0) {
      int runBit = fh & n;
      //这里是 1.7 的 CHM 的 rehash 方法和 1.8 HashMap的 resize 方法的结合体。
      //会分成两条链表,一条链表和原来的下标相同,另一条链表是原来的下标加数组长度的位置
      //然后找到 lastRun 节点,从它到尾结点整体迁移。
      //lastRun前边的节点则单个迁移,但是需要注意的是,这里是头插法。
      //另外还有一点和1.7不同,1.7 lastRun前边的节点是复制过去的,而这里是直接迁移的,没有复制操作。
      //所以,最后会有两条链表,一条链表从 lastRun到尾结点是正序的,而lastRun之前的元素是倒序的,
      //另外一条链表,从头结点开始就是倒叙的。看下图。
      Node<K,V> lastRun = f;
      for (Node<K,V> p = f.next; p != null; p = p.next) {
       int b = p.hash & n;
       if (b != runBit) {
        runBit = b;
        lastRun = p;
       }
      }
      if (runBit == 0) {
       ln = lastRun;
       hn = null;
      }
      else {
       hn = lastRun;
       ln = null;
      }
      for (Node<K,V> p = f; p != lastRun; p = p.next) {
       int ph = p.hash; K pk = p.key; V pv = p.val;
       if ((ph & n) == 0)
        ln = new Node<K,V>(ph, pk, pv, ln);
       else
        hn = new Node<K,V>(ph, pk, pv, hn);
      }
      setTabAt(nextTab, i, ln);
      setTabAt(nextTab, i + n, hn);
      setTabAt(tab, i, fwd);
      advance = true;
     }
     //树节点
     else if (f instanceof TreeBin) {
      TreeBin<K,V> t = (TreeBin<K,V>)f;
      TreeNode<K,V> lo = null, loTail = null;
      TreeNode<K,V> hi = null, hiTail = null;
      int lc = 0, hc = 0;
      for (Node<K,V> e = t.first; e != null; e = e.next) {
       int h = e.hash;
       TreeNode<K,V> p = new TreeNode<K,V>
        (h, e.key, e.val, null, null);
       if ((h & n) == 0) {
        if ((p.prev = loTail) == null)
         lo = p;
        else
         loTail.next = p;
        loTail = p;
        ++lc;
       }
       else {
        if ((p.prev = hiTail) == null)
         hi = p;
        else
         hiTail.next = p;
        hiTail = p;
        ++hc;
       }
      }
      ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
       (hc != 0) ? new TreeBin<K,V>(lo) : t;
      hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
       (lc != 0) ? new TreeBin<K,V>(hi) : t;
      setTabAt(nextTab, i, ln);
      setTabAt(nextTab, i + n, hn);
      setTabAt(tab, i, fwd);
      advance = true;
     }
    }
   }
  }
 }
}

以上就是transfer函数的代码分析,口述下过程就是:

​ 1、先获取当前table的长度记为n,再根据CPU个数给线程分配步长,最小处理步长为16

​ 2、如果传递过来的nextTab为null,则说明是第一个扩容线程,创建一个2n大小的nextTab,赋值给全局变量nextTable,并将n值赋值给全局变量transferIndex

​ 3、使用nextn记录新表长度(2n),创建一个ForwardingNode节点,原table每一个槽位转移完成都会暂时将槽位节点设置成ForwardingNode节点。

​ 4、定义两个boolean变量advance和finishing,一个控制while循环调整索引,一个标志扩容是否结束

​ 5、进入for循环开始扩容,首先进入advance控制的while循环对索引边界进行设置

​ 6、紧接着if判断索引i是否满足一些条件,若满足进行结束判断

​ 7、若(6)不满足的情况下,将原tab的 i 位置的节点赋值给f,判断f是否为空,若为空的话,使用CAS机制将i位置的节点设为ForwardingNode,并置advance为CAS设置节点的结果。

​ 8、若(7)不满足的情况下,将f节点的hash值赋值给fh变量,若fh==MOVED(-1),说明i位置节点已经在扩容过程中转移完成,将advance设为true,方便下一次循环执行

​ 9、若(8)不满足的情况下,说明f节点为转移,则使用synchronized先对f节点加锁,然后判断i位置的节点是否还是f节点,若是则进入 f 节点扩容

默认容量16,sizeCtl(阈值)为12,添加第12个元素时候的流程图:


image.png

迁移后的新数组链表方向示意图,以 runBit =0 为例。


image.png

小结

  1. size() 方法
    通过sumCount()方法获取addCount维护的CounterCell数组累加值,最后再加上baseCount
  2. 扩容
    当键值对数量大于等于sizeCtl,单线程创建新哈希表,多线程复制bucket到新哈希表,容量扩容到原来的2倍

get方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //运用键key的hashCode()计算出哈希地址
    int h = spread(key.hashCode());
    //如果table不为空 且 table长度大于0 且 计算出的下标上bucket不为空,
    //则代表这个bucket存在,进入到bucket中查找,
    //其中(n - 1) & h为计算出键key相对应的数组下标的算法。
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //如果哈希地址、键key相同则表示查找到,返回value,这里查找到的是头节点。
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //如果bucket头节点的哈希地址小于0,则代表bucket为红黑树,在红黑树中查找。
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //如果bucket头节点的哈希地址不小于0,则代表bucket为链表,遍历链表,在链表中查找。
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

番外话题

求证: h & (n - 1) = h % n

我们普遍的一个认知是在获取HashMap集合上数组某个位置的值时可以通过取模的方式去定位到该位置,但是ConcurrentHashMap 在这里使用的却不是常规的取模操作,但最后的计算结果是一样的 。原因在于这里面的 % 运算比 & 运算效率要低,通过这种位运算能够提高定位效率,所以就引出了标题的这个问题,为什么这两种做法的结果会是一样的,我们来探讨一下:


image.png

n – 1 也就是 2的幂次方数 -1 的数 ,而 2的幂次方数 - 1 转化成二进制数后就是上图中的后面若干位数值全为 1 的数,通过这个后几位全为 1 的数,正好可以把不足 2的幂次方数的数值(余数) 很巧妙的取(框)出来 。这也是为什么HashMap之类的集合的数组长度为什么取 2的幂次方数的原因之一,因为 & 运算比 % 运算性能要高 。

ConcurrentHashMap(jdk1.8)里的各找位运算的奇妙之处可参考:https://blog.csdn.net/ZOKEKAI/article/details/90085517
ConcurrentHashMap(jdk1.8)源码详解参考:
https://my.oschina.net/u/3944438/blog/3219586
https://zhuanlan.zhihu.com/p/133923068
https://cloud.tencent.com/developer/article/1448721

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352