问题引入
在java体系中,为保证并发安全,我们通常会采用显示锁或者cas无锁编程。使用显示锁(包括sychorized,lock)来保证临界区的资源安全,是一种常用方式。而cas无锁编程则使用最原始的方式来保证临界资源的安全。如果对lock有过了解的同学,应该知道,lock锁本质就是通过对一个中间变量进行cas操作,如果cas操作成功,则表示加锁成功,如果cas失败,则进入aqs队列等待通知继续cas。不难发现,java中的lock实现本质上也是通过cas操作来获取锁,获取锁成功后,再操作临界资源。那么,我们当然可以直接对临界资源进行cas操作,省去中间加锁这一步,这就像如果我们租房子,我们可以找中介帮我们租,当然,我们也可以直接找到房东。那有同学就要问了,既然如此,jdk为啥还费这么大劲开发lock接口,这就要说到cas操作本身的劣势了。cas全名是 compare and swap,比较并交换,高并发情况下,可能会产生如下几个问题
1、aba问题
2、竞争过于激烈带来大量空自旋,导致cpu的急剧上升。
问题解决思路
aba问题相对来说好解决,加一个版本号就可以。但是cas导致大量空自旋的问题,就不那么好解决了。解决这个问题的思路有两个。
1、为什么会大量空自旋,是因为对临界资源的竞争太过激烈,我们可以采用分散临界资源的方式去减少对临界资源的竞争,从而降低空自旋,这也是计算机科学中最常用的方法论之一:以空间换时间。
2、既然自旋次数太多,我们就让其等待,不再自旋。
基于第二种想法,就产生了Lock锁的方式。我们对临界资源修改的时候,如果并发太高,竞争修改临界资源的线程太多,我们就将cas失败的线程放入到一个队列中,等到修改成功的线程完成后再通知队列中的后继线程去修改,而不是再无脑去不断自旋尝试cas。这样,就避免了直接cas带来的恶性自旋问题。但是采用这种方式,也有坏处,就是高并发下处理性能会有所降低,因为要去争抢锁,并且线程间的通信切换也会带来一定的消耗。这就像租房子的时候找中介,虽然可以省心一点,但是你要付中介费用。
基于此,我们再看第一种方式,jdk中的Atomic原子类,就可以直接通过cas操作保证临界资源的安全,但是存在我们开头所说的问题,所以jdk在1.8中就开发了一个LongAdder 来解决这个问题,用的方式就是我们所说的第一种方式分离热点,下面我们来具体对比一下AtomicLong 和 LongAdder 的性能表现,然后再分析LongAdder的设计思想、源码解析。
AtomicLong 与LongAdder的对比实验
public class CASTest {
//线程数量:10
private final int THREAD_COUNT = 10;
//累加次数:分别测试 1000,10000,100000,1000000,10000000,100000000
private final int TURNS = 1000;
//cpu密集型线程池
ExecutorService pool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),3, TimeUnit.SECONDS,new LinkedBlockingDeque<>());
//原子对象
private AtomicLong atomicLong = new AtomicLong(0);
//LongAdder原子对象
LongAdder longAdder = new LongAdder();
/**
* AtomicLong性能测试
*/
public void testAtomicLong() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(THREAD_COUNT);
long start = System.currentTimeMillis();
for (int i=0 ;i< THREAD_COUNT; i++) {
pool.submit(() -> {
for (int j=0; j<TURNS; j++) {
atomicLong.incrementAndGet();
}
doneSignal.countDown();;
});
}
doneSignal.await();
float time = (System.currentTimeMillis() - start)/1000F;
System.out.println("AtomicLong 测试累加结果: "+atomicLong.get()+ " 耗费时间:"+time );
}
/**
* LongAdder测试类
*/
public void testLongAdder() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(THREAD_COUNT);
long start = System.currentTimeMillis();
for (int i=0 ;i< THREAD_COUNT; i++) {
pool.submit(() -> {
for (int j=0; j<TURNS; j++) {
longAdder.increment();
}
doneSignal.countDown();;
});
}
doneSignal.await();
float time = (System.currentTimeMillis() - start)/1000F;
System.out.println("LongAdder 测试累加结果: "+longAdder.longValue()+ " 耗费时间:"+time );
}
public static void main(String[] args) throws InterruptedException {
CASTest casTest = new CASTest();
casTest.testAtomicLong();
casTest.testLongAdder();
}
}
以上代码是对AtomicLong和LongAdder的性能比较,在启用10个线程的情况下,我们分别对临界资源进行1000,1W,10w,100w,1000w,一亿次累计操作,然后比较AtomicLong和LongAdder所耗费的时间。通过分析下图这个结果,我们可以明显看出,在cas竞争不大的情况下,AtomicLong性能是要高一些的,但是随着cas竞争越来越激烈,LongAdder的优势会越来越明显,在每个线程累加1亿次的情况下,LongAdder的性能接近是AtomicLong 的7倍(而且这个优势可能随着硬件资源的升高会逐渐放大)
LongAdder的原理与分析
接下来我们来看一下LongAdder的原理
原理概述:
AtomicLong 使用内部变量 value 保存着实际的 long 值,所有的操作都是针对该 value 变量进行。也就是说,高并发环境下,value 变量其实是一个热点,也就是 N 个线程竞争一个热点。重试线程越多,意味着 CAS 的失败几率更高,CAS 失败几率就越高,从而进入恶性 CAS 空自旋状态。
LongAdder 的基本思路就是分散热点,将 value 值分散到一个数组中,不同线程会命中到数组的不同槽(元素)中,各个线程只对自己槽中的那个值进行 CAS 操作。这样热点就被分散了,冲突的概率就小很多。当需要获取元素值时,只需要将所有数组元素值累加即可。
原理详解
LongAdder 继承于 Striped64 类,base 值和 cells 数组都在 Striped64 类定义。基类 Striped64 内部三个重要的成员如下:
/**
* 成员一:存放 Cell 的 hash 表,大小为 2 的幂。
*/
transient volatile Cell[] cells;
/**
* 成员二:基础值,
* 1. 在没有竞争时会更新这个值;
* 2. 在 cells 初始化时 cells 不可用,也会尝试将通过 cas 操作值累加到 base。
*/
transient volatile long base;
/**
* 自旋锁,通过 CAS 操作加锁,为 0 表示 cells 数组没有处于创建、扩容阶段
* 为 1 用于表示正在创建或者扩展 Cell 数组,不能进行新 Cell 元素的设置操作。
*/
transient volatile int cellsBusy;
1。在cas竞争不激烈,没有发生cas失败的情况下,线程会直接更新case的值。这种情况下本质上和AtmicLong是一样的。
2、但是一旦竞争加剧,发生了cas失败的情况情况下,线程首先会尝试修改cellsBuzy值,修改成功表示可以创建或者扩容数组
3、修改失败的话,说明正在有其他线程去创建或者扩容数组,则此时去直接修改base值。
4、在创建了cell数组的情况下,会根据线程id对数组的长度取模,然后映射到数组下标某一具体的cell,再去修改这个cell的值,这样就把原来所有线程对base值的竞争修改,分散到了对数组中的每一个cell数组元素值的修改,大大减少了并发竞争,从而降低cas恶性自旋。
5、同时需要注意的一个点是,cell数组的扩容时机以及它的上限大小。在cell数组已经被创建并且对应的数组下标位置已经有值的情况下,某个线程去修改cell元素的值,如果修改失败的话,说明当前cell数组长度可能不够,尽管已经分散了,但是映射到具体某一位置后修改依然失败,这时就需要去扩容数组的长度,扩容一次是原来的两倍。当然,也不可能无限扩容,上限是达到cpu的核心数,因为可以并发的线程数最多就是cpu的核心数。
6、在获取元素值时,直接累加base值和cell数组中所有元素值即可。
总结:
高并发情况下对临界资源的分离热点(分段)是最常用的手段之一,类似的案例还有很多种,比如jdk1.8之前的版本concurrentHashMap也是通过分段锁来降低竞争,比如本地缓存组件caffeine中的StripedBuffer并发写入队列中的场景。具体业务应用层面,比如我们在优化redis大key时,常用的方法也是将key分散;比如商品库存如果是存在redis中,在并发更新redis库存时,如果redis达到性能瓶颈,我们也可以将redis库存key分散成多个库存key 等等...掌握这一思想,在应对高并发的场景时,大有裨益。
附录LongAdder源码解析
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
/**
* :cells 数组不为 null,说明存在争用;在不存在争用的时候,cells 数组
* 一定为 null,一旦对 base 的 cas 操作失败,才会初始化 cells 数组
*/
/**
* 如果 cells 数组为 null,表示之前不存在争用,并且此次 casBase 执行
* 成功,则表示基于 base 成员累加成功,add 方法直接返回;如果 casBase 方法执行失败,
* 说明产生了第一次争用冲突,需要对 cells 数组初始化,此时,即将进入内层 if 块
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
/**
* cells 没有初始化
*/
if (as == null || (m = as.length - 1) < 0 ||
/**
* 指当前线程 hash 到的 cells 数组的元素位置的 Cell 对象为空,意思是
* 还没有其他线程在同一个位置做过累加操作
*/
(a = as[getProbe() & m]) == null ||
/**
* 指当前线程在 cells 数组的被 hash 到的元素位置的 Cell 对象不为空,
* 然后在该 Cell 对象上进行 CAS 设置其值为 v+x(x 为该 Cell 需要累加的值),但是 CAS
* 操作失败,表示存在争用
*/
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
//扩容意向,collide=true 可以扩容,collide=false 不可扩容
boolean collide = false; // True if last slot nonempty
//自旋,一直到操作成功
for (;;) {
Cell[] as; Cell a; int n; long v;
//表示 cells 已经初始化了,当前线程应该将数据写入到对应的 cell 中
if ((as = cells) != null && (n = as.length) > 0) {
//true 表示下标位置的 cell 为 null,需要创建 new Cell
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
/**
* 当前下标位置的cell已经有值,需要进行cas修改cell的值
*(如果修改失败了,说明数组当前的容量可能太小,导致cas失败,后续有扩容意向)
*/
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//如果数组长度大于cpu核心数,则不再扩容
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
//设置扩容意向为true
collide = true;
//真正扩容逻辑,容量设置为原谅的两倍
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//cells 还未初始化(as 为 null),并且 cellsBusy 加锁成功
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
//初始化长度为2
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
/**
* CASE3:当前线程 cellsBusy 加锁失败,表示其他线程正在初始化 cells
* 所以当前线程将值累加到 base,
*/
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
/**
* LongAdder sum的值等于base值 加上所有cell中的值
* @return
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}