背景:
上一篇文章,我们讲了hashMap, 也讲了hashMap为什么不安全
高并发的情况下如果想要使用, 一是使用 HashTable,二是使用 Collections.syncronizedMap。 但是这两种方法的性能都比较差, 需要在执行读写的时候对整个集合加锁, 导致多个线程无法同时写集合。
在SynchronizedMap内部维护了一个普通对象Map,还有排斥锁mutex, 里面的所有的方法 都加了 synchronized (mutex) 方法块
hashTable的方法都被synchronized 关键字修饰
Hashtable 是不允许键或值为 null 的,HashMap 的键值则都可以为 null。
快速失败(fail—fast)是java集合中的一种机制, 在用迭代器遍历一个集合对象时,如果遍历过程中对集合对象的内容进行了修改(增加、删除、修改),则会抛出Concurrent Modification Exception。
安全失败(fail—safe)大家也可以了解下,java.util.concurrent包下的容器都是安全失败,可以在多线程下并发使用,并发修改。
迭代器在遍历时直接访问集合中的内容,并且在遍历过程中使用一个 modCount 变量。
集合在被遍历期间如果内容发生变化,就会改变modCount的值。
每当迭代器使用hashNext()/next()遍历下一个元素之前,都会检测modCount变量是否为expectedmodCount值,是的话就返回遍历;否则抛出异常,终止遍历。
所以, 高并发的情况下还是需要ConcurrentHashMap
下面会分别对1.7和1.8的ConcurrentHashMap做一个介绍
1.7版本 的 ConcurrentHashMap
1.1个人理解
先说一下个人理解吧, 1.7中间加入了segment的概念,其实换个角度理解可以理解成每个segment都是一个hashMap对象。 然后concurrentHashMap中有2的n次方个segment对象(一般情况下是16), 每个segment都有单独的锁,这样子高并发读写的情况下就会有多个锁。每个锁作用的数据区域不同,然后效率就会提高很多。扩容也是针对每个segment单独扩容。 平常的操作步骤主要是多一个通过hash找到segment的过程
put null 的时候会抛出异常
1.2 介绍
ConcurrentHashMap 中有一个 Segment 的概念。Segment 本身就相当于一个 HashMap 对象。同 HashMap 一样,Segment 包含一个 HashEntry 数组,数组中的每一个 HashEntry 既是一个键值对,也是一个链表的头节点。单一的 Segment 结构如下:
ConcurrentHashMap 集合中有 2 的N次方个 Segment 对象,共同保存在一个名为 segments 的数组当中。因此整个 ConcurrentHashMap 的结构如下:
可以说,ConcurrentHashMap 是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表。这样的二级结构,和数据库的水平拆分有些相似。
采取了锁分段技术,每一个 Segment 就好比一个自治区,读写操作高度自治,Segment 之间互不影响。
-
Case1:不同 Segment 的并发写入【可以并发执行】
-
Case2:同一 Segment 的一写一读【可以并发执行】
-
Case3:同一 Segment 的并发写入【需要上锁】
由此可见,ConcurrentHashMap 当中每个 Segment 各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。
1.3 get方法
- 为输入的 Key 做 Hash 运算,得到 hash 值。(为了实现Segment均匀分布,进行了两次Hash)
- 通过 hash 值,定位到对应的 Segment 对象
- 再次通过 hash 值,定位到 Segment 当中数组的具体位置。
- hashEntry中的next , 和value 都用了voliate 变量修饰, 保证获取到的值都是最新的
1.4 put方法
- 为输入的 Key 做 Hash 运算,得到 hash 值。
- 通过 hash 值,定位到对应的 Segment 对象
- 获取可重入锁(会先获取, 获取不到自旋, 到达次数, 阻塞获取锁)
- 再次通过 hash 值,定位到 Segment 当中数组的具体位置。
- 插入或覆盖 HashEntry 对象。 头插
- 释放锁。
rehash() 扩容的过程
- 容量阈值翻倍
- e.hash & sizeMask;
- 头插
- 只在当前segement的数组进行扩容
1.5 size()方法
调用 Size() 是统计 ConcurrentHashMap 的总元素数量,需要把各个 Segment 内部的元素数量汇总起来。但是,如果在统计 Segment 元素数量的过程中,已统计过的 Segment 瞬间插入新的元素,这时候该怎么办呢, 如何保证一致性?
实际上concurrentHashMapde size是一个嵌套循环
- 遍历所有的 Segment。
- 把 Segment 的元素数量累加起来。
- 把 Segment 的修改次数累加起来。
- 判断所有 Segment 的总修改次数是否大于上一次的总修改次数。如果大于,说明统计过程中有修改,重新统计,尝试次数+1;如果不是。说明没有修改,统计结束。
- 如果尝试次数超过阈值,则对每一个 Segment 加锁,再重新统计。
- 再次判断所有 Segment 的总修改次数是否大于上一次的总修改次数。由于已经加锁,次数一定和上次相等。
释放锁,统计结束。
这边用了一个乐观锁的思想, 为了尽量不锁住所有的 Segment,首先乐观地假设 Size 过程中不会有修改。当尝试一定次数,才无奈转为悲观锁,锁住所有 Segment 保证强一致性。
1.6 如何保证线程安全的
- 底层采用分段的数组+链表实现
- HashEntry 使用volatile去修饰了他的数据Value还有下一个节点next, 由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。
- 执行某些操作的时候. 比如put 会先尝试获取锁, 如果获取失败就会自选获取锁,scanAndLockForPut() 自旋获取锁。, 如果自选获取锁的次数达到上限, 改为阻塞获取, 保证能获取成功. put 方法其实和 Java7 HashMap里大致是一样的,只是多了加锁/解锁两步
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
1.8 中的concurrentHashMap
1.8的特点
其中抛弃了原有的 Segment 分段锁,而采用了** CAS + synchronized** 来保证并发安全性。
跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把值和next采用了volatile去修饰,保证了可见性,并且也引入了红黑树,在链表大于一定值的时候会转换(默认是8)。 扩容是并发扩容
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
/**
* The next table to use; non-null only while resizing.
*/
private transient volatile Node<K,V>[] nextTable;
PUT操作
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //两次hash,减少hash冲突,可以均匀分布
//binCount=0说明首节点插入,未进行链表或红黑树操作,因为后面会对这个值进行更改
int binCount = 0;
for (Node<K,V>[] tab = table;;) { //对这个table进行迭代
Node<K,V> f; int n, i, fh;
//这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果获取位置的节点为空,说明是首节点插入情况
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果hash值等于MOVEN(默认-1),说明是协助扩容,与transfer里面
//的ForwardingNode类有关(也就是扩容的时候会把首节点的hash值置为 -1 ,
//然后其他的线程 就知道现在是扩容, 就会协助扩容)
else if ((fh = f.hash) == MOVED)//如果在进行扩容,则先进行扩容操作
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
// 对桶的首节点加锁
synchronized (f) {
//双重判定,为了防止在当前线程进来之前,i地址所对应对象已经更改
if (tabAt(tab, i) == f) {
//TreeBin类型的hash值默认设置为了-2
if (fh >= 0) { //表示该节点是链表结构
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//这里涉及到相同的key进行put就会覆盖原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //插入链表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果hash小于0,判断是否是TreeBin的实例
else if (f instanceof TreeBin) {//红黑树结构
Node<K,V> p;
binCount = 2;
//红黑树结构旋转插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { //如果链表的长度大于8时就会进行红黑树的转换
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//统计size,并且检查是否需要扩容
return null;
会进行hash扩散 (也就是两次hash, 打散)
插入的时候如果要插入的位置上面没有元素,就直接无锁调用Unsafe的方法CAS插入该元素插入,
锁 锁的是数组的头节点
然后会根据头节点的hash值来判断 数组的头结点状态(扩容, 还是树, 还是)
如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
注意这边也不是全局公用某个锁, 而是锁住数组的第一个元素, 然后利用尾插法 插入, 此外, 如果链表的长度大于等于8的时候 就会进行红黑树的转换
然后addCount , 然后顺便会判断是否需要扩容
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
如果发现正在进行扩容, 就先协助进行扩容操作, (最大的协助扩容线程数是2的16次方-1)
其实helpTransfer()方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作
/**
* Helps transfer if a resize is in progress.
* 帮助从旧的table的元素复制到新的table中
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//条件:原结点不为空,这个结点是协助结点(ForwardingNode)并且不处于最后
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //新的table nextTba已经存在前提下才能帮助扩容
//sizeCtl是负数,标明还在扩容;tab和现在占用的table不同,说明扩容还在进行当中
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);//调用扩容方法
break;
}
}
return nextTab;
}
return table;
}
扩容的方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 每核处理的量小于16,则强制赋值16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //构建一个nextTable对象,其容量为原来容量的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 连接点指针,用于标志位(fwd的hash值为-1,fwd.nextTable=nextTab)
// ForwardingNode中会保存新数组的引用
// 初始化ForwardingNode节点,其中保存了新数组nextTable的引用,在处理完每个槽位的节点之后当做占位节点,表示该槽位已经处理过了;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 当advance == true时,表明该节点已经处理过了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 通过for自循环处理每个槽位中的链表元素
// i指当前处理的槽位序号,bound指需要处理的槽位边界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 控制 --i ,遍历原hash表中的节点
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 用CAS计算得到的transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 已经完成所有节点复制了
if (finishing) {
nextTable = null;
table = nextTab; // table 指向nextTable
sizeCtl = (n << 1) - (n >>> 1); // sizeCtl阈值为原来的1.5倍
return; // 跳出死循环,
}
// CAS 更扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 遍历的数组中的节点为null,则放入到ForwardingNode 指针节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// f.hash == -1 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了
// 这里是控制并发扩容的核心
// 如果这个槽位已经被线程A处理了,那么线程B处理到这个节点时,取到该节点的hash值应该为MOVED,值为-1,则直接跳过,继续处理下一个槽位的节点
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 开始处理这个槽位, 先定义两个变量节点ln和hn, 按我的理解应该是lowNode和highNode,分别保存hash值的第X位为0和1的节点,具体实现如下:
// 节点加锁
synchronized (f) {
// 节点复制工作
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh >= 0 ,表示为链表节点
if (fh >= 0) {
// 构造两个链表 一个是原链表 另一个是原链表的反序排列
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// fn&n,可以快速把链表中的元素区分成两类
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 在nextTable i 位置处插上链表
setTabAt(nextTab, i, ln);
// 在nextTable i + n 位置处插上链表
setTabAt(nextTab, i + n, hn);
// 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
setTabAt(tab, i, fwd);
// advance = true 可以执行--i动作,遍历节点
advance = true;
}
// 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 扩容后树节点个数若<=6,将树转链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
扩容过程有点复杂,这里主要涉及到多线程并发扩容,ForwardingNode的作用就是支持扩容操作,将已处理的节点和空节点置为ForwardingNode,并发处理时多个线程经过ForwardingNode就表示已经遍历了,就往前遍历,下图是多线程合作扩容的过程:
扩容的条件:
1 数量达到负载因子决定的数量
2 链长度大于等于8的时候,且数组长度没有超过64, 会先进行扩容
参考了一篇别人的文章不错, 也放到这边
假设这样子的一个情况
在上图中,第14个槽位插入新节点之后,链表元素个数已经达到了8,且数组长度为16,优先通过扩容来缓解链表过长的问题,实现如下:
1、根据当前数组长度n,新建一个两倍长度的数组nextTable;
2、初始化ForwardingNode节点,其中保存了新数组nextTable的引用,在处理完每个槽位的节点之后当做占位节点,表示该槽位已经处理过了;
3、通过for自循环处理每个(数组)槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值,并初始化i和bound值,i指当前处理的槽位序号,bound指需要处理的槽位边界,先处理槽位15的节点(从数组最后一位向前面处理);
4、在当前假设条件下,槽位15中没有节点,则通过CAS插入在第二步中初始化的ForwardingNode节点,用于告诉其它线程该槽位已经处理过了;
5、如果槽位15已经被线程A处理了,那么线程B处理到这个节点时,取到该节点的hash值应该为MOVED,值为-1,则直接跳过,继续处理下一个槽位14的节点;
6、处理槽位14的节点,是一个链表结构,先定义两个变量节点ln和hn,按我的理解应该是lowNode和highNode,分别保存hash值的第X位为0和1的节点,具体实现如下:
使用fn&n可以快速把链表中的元素区分成两类,A类是hash值的第X位为0,B类是hash值的第X位为1,并通过lastRun记录最后需要处理的节点,A类和B类节点可以分散到新数组的槽位14和30中,在原数组的槽位14中,蓝色节点第X为0,红色节点第X为1,把链表拉平显示如下:
1、通过遍历链表,记录runBit和lastRun,分别为1和节点6,所以设置hn为节点6,ln为null;
2、重新遍历链表,以lastRun节点为终止条件,根据第X位的值分别构造ln链表和hn链表:
ln链:和原来链表相比,顺序已经不一样了(相对顺序是反着的)
hn链:(顺序不变)
通过CAS把ln链表设置到新数组的i位置,hn链表设置到i+n的位置;
7、如果该槽位是红黑树结构,则构造树节点lo和hi,遍历红黑树中的节点,同样根据hash&n算法,把节点分为两类,分别插入到lo和hi为头的链表中,根据lo和hi链表中的元素个数分别生成ln和hn节点,其中ln节点的生成逻辑如下:
(1)如果lo链表的元素个数小于等于UNTREEIFY_THRESHOLD,默认为6,则通过untreeify方法把树节点链表转化成普通节点链表;
(2)否则判断hi链表中的元素个数是否等于0:如果等于0,表示lo链表中包含了所有原始节点,则设置原始红黑树给ln,否则根据lo链表重新构造红黑树。
最后,同样的通过CAS把ln设置到新数组的i位置,hn设置到i+n位置。
操作
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); //计算hash,再散列
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素
if ((eh = e.hash) == h) { //如果该节点就是首节点就返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
//查找,查找到就返回
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
ConcurrentHashMap的get操作的流程很简单,也很清晰,可以分为三个步骤来描述
计算hash值,定位到该table索引位置,如果是首节点符合就返回
如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null
总结
JDK1.8取消了segment数组,直接用table保存数据,锁的粒度更小,减少并发冲突的概率。
JDK1.8存储数据时采用了链表+红黑树的形式,纯链表的形式时间复杂度为O(n),红黑树则为O(logn),性能提升很大。什么时候链表转红黑树?当key值相等的元素形成的链表中元素个数大于等于8个的时候(还有一个很重的条件, 数组的长度要大于等于64), 小于等于6的 时候会树转链
最小扩容的长度是16
表数据量在8-64之间时,会优先扩容hash数组;只有表数据量超过64时,内部才会执行树化。
JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)
JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock,我觉得有以下几点
因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了
JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据