散列表(Hash table)是字典结构的常用实现,它能够在插入和根据Key查询数据时都保持O(1)的时间复杂度。大部分语言中都有散列表的默认实现,比如Java中的HashMap
和Go中的map。基于大部分使用场景的性能考虑,这些实现都不是并发安全的。为了在多线程下安全的使用map,大部分基础库都额外提供了线程安全的map实现,我们从源码层面看下这些库的实现原理。
所有读写串行执行
既然并发读写同一个Map不是线程安全的,那么最简单的方式就是把所有的操作都串行化。Java最早的Map实现Hashtable
就是这么做的,通过将所有方法访问标识都设置成synchronized
,保证了同一时间只有一个线程在访问Map实例,从而达到线程安全的目的。我们可以看下代码:
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
...
}
public synchronized V put(K key, V value) {
...
}
public synchronized int size() {
return count;
}
存在问题
这种实现存在的问题就是,所有操作都是串行的,在没有修改操作只是并发读取的情况下也要串行执行,大大影响并发速度。
普通字典加读写锁
既然前一种实现中读写会相互影响,那自然就想到可以将读写分离,让读取操作可以并发执行,读写和写写之间串行。java中我们可以使用普通的HashMap
和ReadWriteLock
来达到这个目的。我们看下示例代码:
public class ConcurrentMap<K, V> {
private ReadWriteLock lock;
private Map<K, V> dataMap;
public ConcurrentMap() {
dataMap = new HashMap<>();
lock = new ReentrantReadWriteLock();
}
public V get(K key) {
lock.readLock().lock();
try{
return dataMap.get(key);
}finally {
lock.readLock().unlock();
}
}
public void put(K key, V value) {
lock.writeLock().lock();
try{
dataMap.put(key, value);
}finally {
lock.writeLock().unlock();
}
}
public int size() {
lock.readLock().lock();
try{
return dataMap.size();
}finally {
lock.readLock().unlock();
}
}
}
使用JDK中提供的读写锁,可以达到两个目的。首先,在修改数据的时候通过获取写锁,可以阻塞其它线程的读写操作,不会造成并发写; 同时在读取数据的时候只要没有写锁存在,而读锁不会互相阻塞,就可以实现并发读。再次,读写锁还可以保证dataMap
中数据的内存可见性。
上面这种实现方式在写少读多的情况下会大大提高并发执行效率 (可以使用JDK1.8中新提供的StampedLock
代替ReadWriteLock
对上面的代码性能做进一步优化)。
存在问题
这种方式减少了读写冲突,但是因为写锁只有一个,对于多个并发中有修改数据的操作,仍然需要串行执行。而且读取的时候还是要先尝试获取一下读锁,总归还是有影响的。
读写字典分开
Go语言的并发安全的Map实现中,使用了另外一种读写分离的方案。即在一个ConcurrentMap
的底层使用一个普通的ReadMap和一个普通的dirtyMap来存储数据。两个map中key有可能是重复的,但是同一个key对应的value通过指针的方式指到同一个内存地址。
新增数据时,直接写到dirtyMap中,并且修改readMap的标记位,标示dirtyMap中存在readMap没有的数据。在达到一定条件后,使用dirtyMap的数据覆盖readMap。在读取数据的时候,先从readMap中不加锁读取,如果找到value或者没找到value但是标记位表示dirtyMap中没有新数据,则直接返回。否则对dirtyMap加锁读取。
首先来看下Get实现:
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
//1. 使用原子操作加载read map
read, _ := m.read.Load().(readOnly)
//2. 从read map中查询key
e, ok := read.m[key]
//3. 如果没找到并且dirty map中有新的key
if !ok && read.amended {
//4. 获取锁
m.mu.Lock()
//5. 获取锁之后从对read map做Double Check
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
//6. 还是没找到
if !ok && read.amended {
//7. 检查dirty map中key存不存在
e, ok = m.dirty[key]
//8. 每次从dirty map尝试读取都会增加miss计数
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
//9. 返回结果,这里用的原子Load操作
return e.load()
}
从上面的代码中可以看到,使用了两种手段保证并发安全。首先,每次都使用原子操作加载read map,这样可以保证内存可见性;再次,如果需要从dirty map中获取值,则需要加锁,并且在加锁成功后再次检查read map有没有变化。这里通过read.amended属性来减少获取锁的次数,如果map内数据一直没有变化,amended等于false,就不会进入加锁检查读取dirty map的逻辑。
第8步中的missLocked()方法会增加miss计数,当计数超过阈值时,会使用dirty map覆盖read map,这样dirty map中如果有新的key,一段时间的加锁读取之后就会转换为直接读取。
下面再来看Put的逻辑:
func (m *Map) Store(key, value interface{}) {
//1. 通过原子操作加载read map,如果key存在通过CAS覆盖value
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
//2. 获取锁
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
//3. 获取锁成功后对 read map做double check
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
//4. 如果dirty map中存在,更新
e.storeLocked(&value)
} else {
//5. 新的key插入到dirty map中
if !read.amended {
//6. 更新read.amended属性为true, 表示dirty map中增加了新的key
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
在Put操作的时候,首先是判断read map中key是否存在,存在则更新。这里使用的是CAS循环更新保证第3,4行之间即使有其他线程改了key的值也不会有问题。
如果read map中不存在,则加锁更新或者新增到dirty map中。
存在问题
Go中使用的方法跟前面使用读写锁的办法相比,通过使用原子load和store方法,减少写锁阻塞范围。同时如果key存在的情况下,更新操作无需加锁,性能也会有提升。但是对于新的key写入的情况,仍然需要加锁串行执行。而且因为读写数据分离,需要做数据定期copy,所以这个实现仅适合写少读多或者多线程更新不同的key的场景。
使用分段锁
上面讲的读写锁和Go中的实现方案最大的问题是只要有任何一个线程获取到写锁,其他写线程只能等待,无论多个线程操作的数据是否有关系。所以,JDK1.8之前的实现类ConcurrentHashMap
采用分段锁来做优化。
首先来看下数据结构:
每个Map中包含多个Segment
,每个Segment
中包含一个类似HashMap
的实现。每个key先在计算出hashCode后,首先定位属于哪个segment, 然后再在Segment包含的table中定位属于哪个Slot。
当执行Get操作时,利用final和volatile变量的内存可见性定义,无需加锁从内存中直接读取。执行Put操作时,首先定位Segment
,然后通过轮询获取锁。这样锁定的范围仅限于当前的Segment
,不同Segment
的更新操作可以并发执行。
Segment定义
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
}
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
Segment是直接继承了ReentrantLock
,这样可以少定义一个锁对象。同时通过将包含的HashEntry
table用volatile修饰来保证并发读取的时候的内存可见性。HashEntry
的hash code和key属性都是final的,保证初始化后所有线程看到的值是不会变的,value和next也都是volatile的。
Get方法实现
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
//1. 获取key的hashcode
int h = hash(key);
//2. 计算Segment的index
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//3. 获取key所属的Segment和包含的Hash table
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//4. 遍历key所在slot的List, 获取value的值
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
Put操作实现
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
//如果Segment尚未初始化,则使用CAS方式初始化Segment
s = ensureSegment(j);
//调用Segment的put方法
return s.put(key, hash, value, false);
}
Put操作除了跟Get操作一样,首先获取Segment
之外,如果Segment
尚未初始化,会采用CAS加循环的方式初始化Segment
,仍然不需要使用锁。然后调用Segment
的put方法。
Segment.put()
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//1. 获取锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//2. 计算key所在的slot
int index = (tab.length - 1) & hash;
//3. 获取slot链表的第一个元素
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
//4. 如果同一个slot存在HashEntry, 则更新或者一直走到list的末尾
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
//5. Key已存在,更新
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount; //记录更新次数
}
break;
}
e = e.next;
}
else {
//6. 如果slot为空,则当前Entry作为第一个元素
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); //判断是否需要做rehash
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
在Segment
的put方法第一步就先尝试获取锁,这里面的scanAndLockForPut()
方法会循环调用tryLock()
,如果超过指定的次数仍然没有获取到,则直接调用lock()方法等待其它线程释放锁。
存在的问题
JDK1.8之前的实现中,通过将key分到不同的Segment
来减小锁的力度,只有属于同一个Segment
的Key在更新时才会互相阻塞,大大降低了并发冲突的可能。但是Lock的实现原理决定了Segment
的数目是有数量限制的,过多会导致锁轮询获取时,耗费大量的CPU时间。但是如果过少,又会造成单个Segment
中元素过多,锁的粒度会变大。
JDK1.8锁定策略
在JDK1.8中对ConcurrentHashMap
做了重新实现,取消了Segment
逻辑,而更像是普通的HashMap
,并针对每个Slot加了一个同步锁。
数据结构如下:
对于hash table每个slot中链表超过一定长度,则转化为红黑树,加快查找速度。同时对Node的属性定义也使用final和volatile修饰保证内存可见性。
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
transient volatile Node<K,V>[] table;
...
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
}
对于key的查找操作跟之前区别不大,只是少了Segment
定位这一步,直接定位到具体的Slot。重点来看一下Put操作:
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //hash table延迟初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//1. 如果Slot是空的,直接通过CAS操作将当前值作为第一个元素,无需lock
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//2. 正在做rehash
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//3. 同步锁锁定Slot的第一个Node
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 链表存储
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
//红黑树存储
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//4. 是否需要转成红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//5. 增加计数,用于size()返回
addCount(1L, binCount);
return null;
}
Put操作中首先会检查Slot是否为空,如果为空,则使用CAS加循环将当前元素设置为第一个元素,无需加锁。如果Slot中已经存在元素,使用同步锁锁定Slot中第一个元素,做插入或者更新操作。
从上面的锁定逻辑可以看到,同一个slot中的元素更新时才会互相阻塞,而且新版本中不再使用ReentrantLock
,相对于之前版本锁定粒度更小。那之前版本锁过多的问题如何解决呢?这个主要是在1.8 Java虚拟机对于同步锁做了较大的升级,引入了偏向锁->轻量级锁->重量级锁的升级逻辑,显然JDK开发者经过测试同步锁不足以对性能产生较大影响。
实际使用案例
通过对以上并发安全的Hash表的分析,对我们在解决实际项目中的并发问题也有一定的启发。下面举一个分段锁的使用案例。
在分布式系统中,服务之间经常使用消息队列来解耦,比如电商系统中,一笔订单的状态变化可能需要推送消息给物流系统、ERP系统、CRM系统等,而在接收端Consumer通常是多线程处理消息的,大致架构如下:
在特定业务场景下,需要同一笔订单的消息不能并发处理。最好的解决办法,将属于同一笔订单的消息使用同一个线程来处理,但是对于使用线程池的场景,这个要求是很难满足的。所以自然想到用互斥锁来实现,我们可以借鉴ConcurrentHashMap
中的分段锁逻辑,比如使用8个Lock,每个订单消息收到的时候,将订单id对8取模,然后获取相应的Lock。
大致的代码逻辑如下:
public class OrderMessageListener {
private Lock[] locks = new ReentrantLock[8];
public void process(message Message) {
int index = message.getOrderID() % locks.length;
locks[index].lock();
try{
// process message
}finally {
locks[index].unlock();
}
}
}
以上逻辑只能处理单个consumer进程的情况
总结
从以上实现方案可以看到,高并发场景下需要保证线程安全又能保持高吞吐,优先考虑是否可以读操作不使用互斥锁,而选择用原子操作或者volatile变量控制内存可见性。对于写操作的锁,需要尽量减小锁粒度,并尽量快的释放锁。