知道了如何解决线程安全问题,接下来就要考虑性能问题了
在并发的优化上,无锁的性能肯定是最好的,但是很多时候我们又不得不加锁,在加锁的方案中,有忙阻塞等待如自旋锁,以及休眠等待,这两种加锁的方式,并不存在哪一种性能更好,需要根据并发的数据进行选择。
无锁
我们已经知道协程和本地线程存储可以实现无锁的多线程,还有其他优化方案吗?有,锁消除和偏向锁。
锁消除
我们可以根据经验和业务分析判断是否会有产生临界区脏数据的可能,如果没有这个可能,则可以消除锁。
业务中的锁我们可以自己消除,但是虚拟机或者Java内部库中的锁我们就没法消除了,什么是Java内部的锁呢?
比如我们常用的StringBuffer.append()函数中就有同步代码块。
内部库的锁可以通过编译时消除,比如JVM的逃逸分析技术,在编译代码的过程中,会判断对象是否逃逸,如果没有逃逸,逃逸指的时对象可能会被其他线程使用到,就说明没有线程安全的可能性,于是会消除锁。
- 痛点:根据代码逃逸技术,如果判断到一段代码中,堆上的数据不会逃逸出当前线程,那么可以认为这段代码是线程安全的,不必要加锁
- 原理: JVM在编译时通过对运行上下文的描述,去除不可能存在共享资源竞争的锁,通过这种方式消除无用锁,即删除不必要的加锁操作,从而节省开销
- 使用: 逃逸分析和锁消除分别可以使用参数-XX:+DoEscapeAnalysis和-XX:+EliminateLocks(锁消除必须在-server模式下)开启
- 补充:在JDK内置的API中,例如StringBuffer、Vector、HashTable都会存在隐性加锁操作,可消除
/**
* 比如执行10000次字符串的拼接
*/
public static void main(String[] args) {
SynchronizedDemo synchronizedDemo = new SynchronizedDemo();
for (int i = 0 ; i < 10000 ; i++){
synchronizedDemo.append("kira","sally");
}
}
public void append(String str1,String str2){
//由于StringBuffer对象被封装在方法内部,不可能存在共享资源竞争的情况
//因此JVM会认为该加锁是无意义的,会在编译期就删除相关的加锁操作
//还有一点特别要注明:明知道不会有线程安全问题,代码阶段就应该使用StringBuilder
//否则在没有开启锁消除的情况下,StringBuffer不会被优化,性能可能只有StringBuilder的1/3
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append(str1).append(str2);
}
偏向锁
偏向锁并不是不加锁,而是只加一次锁,只要一个线程获得了偏向锁,即使当这个线程退出临界区后,这个锁依然会“偏向”这个线程,当这个线程再次要进入临界区是,就可以直接进入临界区,不需要重新加锁的过程。
这里介绍一下JVM的偏向锁的实现,但我们需要先了解一下Java类的对象头的知识点。当一个Java类会被解析成class对象,并加载在内存中后,这个对象由三部分组成:对象头、实例数据和对齐填充,对象头又由markWord和Klass指针组成。
- Mark Word:对象的 Mark Word 部分占 4 个字节(32位系统),包含一些的标记位,比如轻量级锁、偏向锁标记等等
- Klass Pointer:Class 对象指针占是 4个字节(32位系统),指向的位置是对象对应的 Class 对象的内存地址
- 实例数据:这里面包括了对象的所有成员变量
- 对齐填充:最后一部分是对齐填充的字节,按 8 个字节填充
我们主要看一下Mark Word,它的结构如下。
可以看到markwork实际上大部分数据都是用来记录锁相关的信息的,比如锁状态,偏向锁等等。
了解了markwork,我们接着了解什么是偏向锁,偏向锁中由一个ThreadId字段,这个字段如果是空的,某个线程第一次获取锁的时候,就将自身的ThreadId写入到锁的ThreadId字段内,将markwork内的是否偏是向锁的状态位置1,这样下次获取锁的时候,*直接检查ThreadId是否和自身线程Id一致,如果一致,则认为当前线程已经获取了锁,因此不需再次获取锁。
忙等待阻塞锁
自旋锁
忙等待锁就是自旋锁,它通过循环不断的获取锁,如果在自旋的途中,获取锁成功,则进入临界区,不成功就一直自旋。
关于自旋锁的实现,前面CAS和TAS都有实现过。
自旋锁的优点是不需要让线程陷入休眠,避免了线程切换事件,
但是如果自旋过久,会浪费CPU的资源,这个时候让线程陷入休眠是一种更好的选择。
所以自旋次数的选择就比较重要了,在JDK1.6之前,Synchronized再获取锁时,会先自旋10次,如果不成功就会升级成重量级锁,让线程陷入休眠。
自适应自旋锁
JDK1.6中引入了自适应的自旋锁。自适应意味着自旋的时间不再固定了,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也很有可能再次成功,进而它将允许自旋等待持续相对更长的时间,比如100个循环。另外,如果对于某个锁,自旋很少成功获得过,那在以后要获取这个锁时将可能省略掉自旋过程,以避免浪费处理器资源。有了自适应自旋,随着程序运行和性能监控信息的不断完善,虚拟机对程序锁的状况预测就会越来越准确,虚拟机就会变得越来越“聪明”了。
休眠阻塞锁
休眠阻塞锁,顾名思义,就是在线程获取锁后,让线程陷入休眠,它一般被称为重量级锁。这种锁需要管程的介入,线程的休眠和唤醒也会比较耗费性能,既然都已经是重量级锁了,还有优化的空间吗?还是有的。
细化锁的粒度
我们可以通过减少同步的代码块数量来优化锁的性能,比如将Synchronize锁住整个方法改成只锁住方法内可能会产生线程安全的代码。
粗化锁的粒度
粗化锁的粒度在某些场景也能优化锁的性能,比如某个方法内有好几个锁,我们可以将这些锁都可以一个锁,来减少加锁和释放锁的损耗。JVM虚拟机也会通过粗化锁的粒度来优化锁性能的,比如StringBuffer.append()方法内部是由同步代码快的,如果我们多次连续调用append方法,JVM会将这些append方法内部的锁消除,并在连续append方法间加一把锁。
- 痛点:多次连接在一起的加锁、解锁操作会造成
- 原理:将多次连接在一起的加锁、解锁操作合并为一次,将多个连续的锁扩展成一个范围更大的锁
- 使用:将多个彼此靠近的同步块合同在一个同步块 或 把多个同步方法合并为一个方法
- 补充:在JDK内置的API中,例如StringBuffer、Vector、HashTable都会存在隐性加锁操作,可合并
/**
* StringBuffer是线程安全的字符串处理类
* 每次调用stringBuffer.append方法都需要加锁和解锁,如果虚拟机检测到有一系列连串的对同一个对象加锁和解锁操作,就会将其合并成一次范围更大的加锁和解锁操作,即在第一次append方法时进行加锁,最后一次append方法结束后进行解锁
*/
StringBuffer stringBuffer = new StringBuffer();
public void append(){
stringBuffer.append("kira");
stringBuffer.append("sally");
stringBuffer.append("mengmeng");
}
增加锁的数量
如果大量的并发线程都用同一把锁,那么所有的线程始终同时只有一个线程能访问临界区,其他的线程都在等待,这样也会造成性能的浪费。我们可以通过增加锁的数量,将临界区不同的区域分别加锁,这样就可以让更多的线程对临界区进行访问。
锁优化案例
Synchronized
在JDK1.6版本上,HotSpot虚拟机开发团队花费了很大的的精力去实现和优化各种锁优化技术。
Synchronized就是优化的重点之一,Synchronized会先使用偏向锁加锁,如果访问临界区的线程超过了一个,就会升级成轻量级锁,轻量级锁通过互斥来实现加锁的过程,只要多个线程没有产生竞争条件,就可以通过互斥进行加锁,当有多个线程同时竞争锁时,互斥就没用了,所以Synchronized会从轻量级锁升级成重量级锁,Synchronized的重量级锁在获取锁的过程中,也会先通过自旋的方式获取锁,如果自旋失败,最后才采用管程,将线程陷入休眠。
可以看到,Synchronized是从偏向锁,到轻量级锁,然后到自旋锁,最后休眠阻塞锁这样一个不断升级的过程。
ConcurrentHashMap
ConcurrentHashMap在jdk1.7之前采用分断锁来对锁进行了优化,分断锁是通过增加锁的数量来达到优化的目的。我们看一下它的实现。
1.7版本的ConcurrentHashMap主要由两部分组成:Segments和HashEntry。Segments存方HashEntry,HashEntry存放我们的Key,Value。他们的关系实现如下:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
final int segmentMask; //segments的掩码值
final int segmentShift; //segments的偏移量
final Segment<K,V>[] segments;
……
}
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
transient volatile int count;
transient int modCount;
transient int threshold;
transient volatile HashEntry<K,V>[] table;
final float loadFactor; //扩容负载因子
……
}
我们接着看ConcurrentHashMap的put和get方法是如何保证线程安全的。
put方法的实现
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//获取key的hash值
int hash = hash(key);
//hash值右移segmentShift位与段掩码进行位运算,定位segment
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
ConcurrentHashMap的put方法获主要是获取获取key值的哈希函数,然后根据hash获取Segment段,接着调用Segment的put方法,它的实现如下。
V put(K key, int hash, V value, boolean onlyIfAbsent) {
//对segment加锁
lock();
try {
int c = count;
if (c++ > threshold) //如果超过再散列的阈值
rehash(); //执行再散列,table 数组的长度将扩充一倍
HashEntry<K,V>[] tab = table;
//把散列码值与 table 数组的长度减 1 的值相“与”
//得到该散列码对应的 table 数组的下标值
int index = hash & (tab.length - 1);
//找到散列码对应的具体的那个桶
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) { //如果键值对以经存在
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value; // 设置 value 值
}
else { //键值对不存在
oldValue = null;
++modCount; //添加新节点到链表中,modCont 要加 1
// 创建新节点,并添加到链表的头部
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; //写 count 变量
}
return oldValue;
} finally {
unlock(); //解锁
}
}
get方法的实现
在接着看get方法的实现
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//先定位Segment,再定位HashEntry
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
由于get并不会导致线程安全问题,所以直接从Segment取HashEntry就行了,并不用加锁。
在JDK1.8中,已经放弃了分段锁的方式,Segment数组也没有了。所有的HashEntry都存放在Node数组中,并且采用CAS+Synchronize的加锁方式,在put方法中,会先判断所存放的Node的位置是否有值,即是否会产生HASH冲突,如果没值,直接采用CAS加锁,存放HashEntry,如果有,则采用Synchronize加锁后再进行存放逻辑。有兴趣的可以去看JDK1.8中ConcurrentHashMap的实现,这里就不说了。