死磕 java并发包之LongAdder源码分析

(1)java8中为什么要新增LongAdder?

(2)LongAdder的实现方式?

(3)LongAdder与AtomicLong的对比?

简介

LongAdder是java8中新增的原子类,在多线程环境中,它比AtomicLong性能要高出不少,特别是写多的场景。

它是怎么实现的呢?让我们一起来学习吧。

原理

LongAdder的原理是,在最初无竞争时,只更新base的值,当有多线程竞争时通过分段的思想,让不同的线程更新不同的段,最后把这些段相加就得到了完整的LongAdder存储的值。


// Striped64中的内部类,使用@sun.misc.Contended注解,说明里面的值消除伪共享 @sun.misc.Contended static final class Cell { // 存储元素的值,使用volatile修饰保证可见性 volatile long value; Cell(long x) { value = x; } // CAS更新value的值 final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe实例 private static final sun.misc.Unsafe UNSAFE; // value字段的偏移量 private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }

Cell类使用@sun.misc.Contended注解,说明是要避免伪共享的。

使用Unsafe的CAS更新value的值,其中value的值使用volatile修饰,保证可见性。

关于Unsafe的介绍请查看【死磕 java魔法类之Unsafe解析】。

关于伪共享的介绍请查看【杂谈 什么是伪共享(false sharing)?】。

主要属性

// 这三个属性都在Striped64中 // cells数组,存储各个段的值 transient volatile Cell[] cells; // 最初无竞争时使用的,也算一个特殊的段 transient volatile long base; // 标记当前是否有线程在创建或扩容cells,或者在创建Cell // 通过CAS更新该值,相当于是一个锁 transient volatile int cellsBusy;

最初无竞争或有其它线程在创建cells数组时使用base更新值,有过竞争时使用cells更新值。

最初无竞争是指一开始没有线程之间的竞争,但也有可能是多线程在操作,只是这些线程没有同时去更新base的值。

有过竞争是指只要出现过竞争不管后面有没有竞争都使用cells更新值,规则是不同的线程hash到不同的cell上去更新,减少竞争。

add(x)方法

add(x)方法是LongAdder的主要方法,使用它可以使LongAdder中存储的值增加x,x可为正可为负。

public void add(long x) { // as是Striped64中的cells属性 // b是Striped64中的base属性 // v是当前线程hash到的Cell中存储的值 // m是cells的长度减1,hash时作为掩码使用 // a是当前线程hash到的Cell Cell[] as; long b, v; int m; Cell a; // 条件1:cells不为空,说明出现过竞争,cells已经创建 // 条件2:cas操作base失败,说明其它线程先一步修改了base,正在出现竞争 if ((as = cells) != null || !casBase(b = base, b + x)) { // true表示当前竞争还不激烈 // false表示竞争激烈,多个线程hash到同一个Cell,可能要扩容 boolean uncontended = true; // 条件1:cells为空,说明正在出现竞争,上面是从条件2过来的 // 条件2:应该不会出现 // 条件3:当前线程所在的Cell为空,说明当前线程还没有更新过Cell,应初始化一个Cell // 条件4:更新当前线程所在的Cell失败,说明现在竞争很激烈,多个线程hash到了同一个Cell,应扩容 if (as == null || (m = as.length - 1) < 0 || // getProbe()方法返回的是线程中的threadLocalRandomProbe字段 // 它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的 // 除非刻意修改它 (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) // 调用Striped64中的方法处理 longAccumulate(x, null, uncontended); } }

(1)最初无竞争时只更新base;

(2)直到更新base失败时,创建cells数组;

(3)当多个线程竞争同一个Cell比较激烈时,可能要扩容;

longAccumulate()方法

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 存储线程的probe值 int h; // 如果getProbe()方法返回0,说明随机数未初始化 if ((h = getProbe()) == 0) { // 强制初始化 ThreadLocalRandom.current(); // force initialization // 重新获取probe值 h = getProbe(); // 都未初始化,肯定还不存在竞争激烈 wasUncontended = true; } // 是否发生碰撞 boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; // cells已经初始化过 if ((as = cells) != null && (n = as.length) > 0) { // 当前线程所在的Cell未初始化 if ((a = as[(n - 1) & h]) == null) { // 当前无其它线程在创建或扩容cells,也没有线程在创建Cell if (cellsBusy == 0) { // Try to attach new Cell // 新建一个Cell,值为当前需要增加的值 Cell r = new Cell(x); // Optimistically create // 再次检测cellsBusy,并尝试更新它为1 // 相当于当前线程加锁 if (cellsBusy == 0 && casCellsBusy()) { // 是否创建成功 boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; // 重新获取cells,并找到当前线程hash到cells数组中的位置 // 这里一定要重新获取cells,因为as并不在锁定范围内 // 有可能已经扩容了,这里要重新获取 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 把上面新建的Cell放在cells的j位置处 rs[j] = r; // 创建成功 created = true; } } finally { // 相当于释放锁 cellsBusy = 0; } // 创建成功了就返回 // 值已经放在新建的Cell里面了 if (created) break; continue; // Slot is now non-empty } } // 标记当前未出现冲突 collide = false; } // 当前线程所在的Cell不为空,且更新失败了 // 这里简单地设为true,相当于简单地自旋一次 // 通过下面的语句修改线程的probe再重新尝试 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 再次尝试CAS更新当前线程所在Cell的值,如果成功了就返回 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 如果cells数组的长度达到了CPU核心数,或者cells扩容了 // 设置collide为false并通过下面的语句修改线程的probe再重新尝试 else if (n >= NCPU || cells != as) collide = false; // At max size or stale // 上上个elseif都更新失败了,且上个条件不成立,说明出现冲突了 else if (!collide) collide = true; // 明确出现冲突了,尝试占有锁,并扩容 else if (cellsBusy == 0 && casCellsBusy()) { try { // 检查是否有其它线程已经扩容过了 if (cells == as) { // Expand table unless stale // 新数组为原数组的两倍 Cell[] rs = new Cell[n << 1]; // 把旧数组元素拷贝到新数组中 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 重新赋值cells为新数组 cells = rs; } } finally { // 释放锁 cellsBusy = 0; } // 已解决冲突 collide = false; // 使用扩容后的新数组重新尝试 continue; // Retry with expanded table } // 更新失败或者达到了CPU核心数,重新生成probe,并重试 h = advanceProbe(h); } // 未初始化过cells数组,尝试占有锁并初始化cells数组 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 是否初始化成功 boolean init = false; try { // Initialize table // 检测是否有其它线程初始化过 if (cells == as) { // 新建一个大小为2的Cell数组 Cell[] rs = new Cell[2]; // 找到当前线程hash到数组中的位置并创建其对应的Cell rs[h & 1] = new Cell(x); // 赋值给cells数组 cells = rs; // 初始化成功 init = true; } } finally { // 释放锁 cellsBusy = 0; } // 初始化成功直接返回 // 因为增加的值已经同时创建到Cell中了 if (init) break; } // 如果有其它线程在初始化cells数组中,就尝试更新base // 如果成功了就返回 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }

(1)如果cells数组未初始化,当前线程会尝试占有cellsBusy锁并创建cells数组;

(2)如果当前线程尝试创建cells数组时,发现有其它线程已经在创建了,就尝试更新base,如果成功就返回;

(3)通过线程的probe值找到当前线程应该更新cells数组中的哪个Cell;

(4)如果当前线程所在的Cell未初始化,就占有占有cellsBusy锁并在相应的位置创建一个Cell;

(5)尝试CAS更新当前线程所在的Cell,如果成功就返回,如果失败说明出现冲突;

(5)当前线程更新Cell失败后并不是立即扩容,而是尝试更新probe值后再重试一次;

(6)如果在重试的时候还是更新失败,就扩容;

(7)扩容时当前线程占有cellsBusy锁,并把数组容量扩大到两倍,再迁移原cells数组中元素到新数组中;

(8)cellsBusy在创建cells数组、创建Cell、扩容cells数组三个地方用到;

sum()方法

sum()方法是获取LongAdder中真正存储的值的大小,通过把base和所有段相加得到。

public long sum() { Cell[] as = cells; Cell a; // sum初始等于base long sum = base; // 如果cells不为空 if (as != null) { // 遍历所有的Cell for (int i = 0; i < as.length; ++i) { // 如果所在的Cell不为空,就把它的value累加到sum中 if ((a = as[i]) != null) sum += a.value; } } // 返回sum return sum; }


可以看到sum()方法是把base和所有段的值相加得到,那么,这里有一个问题,如果前面已经累加到sum上的Cell的value有修改,不是就没法计算到了么?

答案确实如此,所以LongAdder可以说不是强一致性的,它是最终一致性的。

LongAdder VS AtomicLong

直接上代码:

public class LongAdderVSAtomicLongTest { public static void main(String[] args){ testAtomicLongVSLongAdder(1, 10000000); testAtomicLongVSLongAdder(10, 10000000); testAtomicLongVSLongAdder(20, 10000000); testAtomicLongVSLongAdder(40, 10000000); testAtomicLongVSLongAdder(80, 10000000); } static void testAtomicLongVSLongAdder(final int threadCount, final int times){ try { System.out.println("threadCount:" + threadCount + ", times:" + times); long start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms"); long start2 = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } } static void testAtomicLong(final int threadCount, final int times) throws InterruptedException { AtomicLong atomicLong = new AtomicLong(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ atomicLong.incrementAndGet(); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } static void testLongAdder(final int threadCount, final int times) throws InterruptedException { LongAdder longAdder = new LongAdder(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ longAdder.add(1); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } }

运行结果如下:

threadCount:1, times:10000000 LongAdder elapse:158ms AtomicLong elapse:64ms threadCount:10, times:10000000 LongAdder elapse:206ms AtomicLong elapse:2449ms threadCount:20, times:10000000 LongAdder elapse:429ms AtomicLong elapse:5142ms threadCount:40, times:10000000 LongAdder elapse:840ms AtomicLong elapse:10506ms threadCount:80, times:10000000 LongAdder elapse:1369ms AtomicLong elapse:20482ms

可以看到当只有一个线程的时候,AtomicLong反而性能更高,随着线程越来越多,AtomicLong的性能急剧下降,而LongAdder的性能影响很小。

总结

(1)LongAdder通过base和cells数组来存储值;

(2)不同的线程会hash到不同的cell上去更新,减少了竞争;

(3)LongAdder的性能非常高,最终会达到一种无竞争的状态;

彩蛋

在longAccumulate()方法中有个条件是n >= NCPU就不会走到扩容逻辑了,而n是2的倍数,那是不是代表cells数组最大只能达到大于等于NCPU的最小2次方?

答案是明确的。因为同一个CPU核心同时只会运行一个线程,而更新失败了说明有两个不同的核心更新了同一个Cell,这时会重新设置更新失败的那个线程的probe值,这样下一次它所在的Cell很大概率会发生改变,如果运行的时间足够长,最终会出现同一个核心的所有线程都会hash到同一个Cell(大概率,但不一定全在一个Cell上)上去更新,所以,这里cells数组中长度并不需要太长,达到CPU核心数足够了。

比如,笔者的电脑是8核的,所以这里cells的数组最大只会到8,达到8就不会扩容了。



©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容

  • 基本原理和思想  Java有很多并发控制机制,比如说以AQS为基础的锁或者以CAS为原理的自旋锁。一般来说,CAS...
    Java耕耘者阅读 1,127评论 0 0
  • 一、简介  在之前的《ConcurrentHashMap深入剖析(JDK8)》文章中,我们看到了CounterCe...
    SunnyMore阅读 1,834评论 1 6
  • 作者: 一字马胡 转载标志 【2017-11-03】 更新日志 Java Striped64 Striped64...
    一字马胡阅读 8,781评论 11 22
  • 30立什么?立身,确立自己的品格和修养,思想修养,道德涵养,能力培养。自强是立身之本,别把自己的需求寄托在父母的资...
    梅溪湖肖老师520阅读 182评论 0 0
  • 管理最主要的是以人为本,优秀的管理者要学会带领自己的团队,做好客户导向、成果导向。管理者要做好对自己团队的监督...
    孙倩阅读 119评论 0 0