LongAdder (下) 代码实现原理篇

LongAdder代码分析

为了解决高并发下多线程对一个变量CAS争夺失败后进行自旋而造成的降低并发性能的问题,LongAdder在内部维护多个Cell元素(一个动态的Cell数组)来分担单个变量进行争夺开销。下面围绕以下话题从源码角度来分析LongAdder的实现!

  1. LongAdder的结构是怎样的?
  2. 当前线程应该访问Cell数组里面哪一个Cell元素?
  3. 如何初始化Cell数组?
  4. Cell数组如何扩容?
  5. 线程访问分配的Cell元素有冲突后如何处理?
  6. 如何保证线程操作被分配的Cell元素的原子性?

解决问题1,首先看下LongAdder的类结构图,如图所示:

image-20200411181035970.png

由该图可知,LongAdder继承自Striped64类,在Striped64内部维护着三个变量。

LongAdder的真实值其实是base的值与Cell数组里面所有Cell元素中的value值的累加,base是个基础值,默认为0。

cellsBusy用来实现自旋锁,状态值只有0和1,当创建Cell元素,扩容Cell数组或者初始化Cell数组时,使用CAS操作该变量来保证同时只有一个线程可以进行其中之一的操作。

解决问题6,下面看Cell的构造!

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

可以看到,Cell的构造很简单,其内部维护了一个被声明为volatile的变量,这里生命为volatile是因为线程操作value变量时没有使用锁,为了保证变量的内存可见性这里将其声明为volatile的。另外cas方法通过CAS操作,保证了当前线程更新时被分配的Cell元素中Value值的原子性。另外,Cell类使用@sun.misc.Contended修饰是为了避免伪共享。

下面先说一说LongAdder常用函数,然后在里边寻找答案。

  • long sum()

    返回当前的值,内部操作是累加所有Cell内部的value值后再累加base。例如下面代码,由于计算总和时没有对Cell数组进行加锁,所以在累加过程中可能有其他线程对Cell中的值进行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常准确的,其返回值并不是一个调用sum方法是的原子快照值。

    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    
  • void reset()

    为重置操作,如下代码把base置为0,如果Cell数组有元素,则元素值被重置为0。

    public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }
    
  • long sumThenReset()

    是sum的改造版本,如下代码在使用sum累加对应的Cell值后,把当前Cell的值重置为0,base重置为0。这样,当多线程调用该方法时会有问题,比如考虑第一个线程清空Cell的值,则后一个线程调用时累加的都是0值。

    public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }
    
  • long longValue()等价于sum()

    下面主要看add方法的实现,从这个方法里面就可以找到其他问题的答案。

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        // (1)~~~
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            // (2)~~~
            if (as == null || (m = as.length - 1) < 0 ||
                // (3) ~~~
                (a = as[getProbe() & m]) == null ||
                // (4)~~~
                !(uncontended = a.cas(v = a.value, v + x)))
                // (5)
                longAccumulate(x, null, uncontended);
        }
    }
    
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
    

    代码(1)首先看cells是否为null,如果为nul则当前在基础变量base上进行累加,这时候就类似AtomicLong的操作。

    如果cells不为null或者线程执行代码(1)的CAS操作失败了,则会去执行代码(2)。

    代码(2) (3) 决定当前线程应该访问cells数组里面的哪一个Cell元素,如果当前线程映射的元素存在执行代码(4),使用CAS操作去更新分配的Cell元素的value值,如果当前线程映射元素不存在或者存在但是CAS操作执行失败执行代码(5)。

    其实将代码(2) (3) (4) 合起来看就是获取当前线程应该访问的cells数组的Cell元素,然后进行CAS更新操作,只是获取期间如果有些条件不满足则会跳转到代码(5)。执行。另外当前线程应该访问cells数组的哪一个Cell元素是通过getProbe() & m 进行计算的,其中m是当前cells数组元素个数-1,getProbe() 则用于获取当前线程中变量 threadLocalRandomProbe的值,这个值一开始为0,在代码(5) 里面会对其进行初始化。并且当前线程通过分配的Cell元素的cas函数来保证对Cell元素value值更新的原子性,到这里再一次完善了问题6并且回答了问题2

  • longAccumulate()

    这里重点研究longAccumulate的代码逻辑,这是cells数组被初始化和扩容的地方。

    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        // (6) 初始化当前线程的变量threadLocalRandomProde的值
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); 
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) { // (7)
                if ((a = as[(n - 1) & h]) == null) { // (8)
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true; 
                // (9) 当前Cell存在,则执行CAS设置
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                // (10) 当前Cell数组元素个数大于CPU个数
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                // (11) 是否有冲突
                else if (!collide)
                    collide = true;
                // (12) 如果当前元素个数没有达到CPU个数并且有冲突则扩容
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            // (12.1)
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        // (12.2)
                        cellsBusy = 0;
                    }
                    // (12.3)
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // (13) 为了能够找到一个空闲的Cell,重新计算hash,xorshift算法生成随机数
                h = advanceProbe(h);
            }
            // (14) 初始化Cell数组
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        // (14.1)
                        Cell[] rs = new Cell[2];
                        // (14.2)
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    // (14.3)
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
    

    上面比较复杂,我们只关注问题3,4,5部分。

    当每个线程第一次执行到代码(6)时,会初始化当前线程变量threadLocalRandomProde的值,上面也说了,这个变量在计算当前线程被分配到cells数组的哪一个Cell元素时会用到。

    问题3

    cells数组的初始化是在代码(14) 中进行的,其中cellsBusy是一个标示,为0说明当前cells数组没有在被初始化或者扩容,也没有在新建Cell元素,为1则说明cells数组在被初始化或者扩容,或者当前在创建新的Cell元素,通过CAS操作来进行0或1状态的切换,这里使用casCellsBusy方法。假设当前线程通过CAS设置cellsBusy为1,则当前线程开始初始化操作,那么这时候其他线程就不能进行扩容了。如代码(14.1)初始化cells数组元素个数为2,然后使用h&1计算当前线程应该访问cell数组的哪个位置,也就是当前线程的threadLocalRandomProbe变量&(cells数组元素个数-1),然后标示cells数组已经被初始化了,最后代码(14.3)重置了cellsBusy标记。显然这里没有使用CAS操作,却是线程安全的,原因是cellsBusy是volatile修饰的。这保证了变量的内存可见性,另外此时其他地方的代码没有机会修改cellsBusy的值。在这里初始化的cells数组里面的两个元素的值目前还是null。这里回答了问题3,知道了cells数组如何被初始化的。

    问题4

    cells数组的扩容实在代码(12)中进行的,对cells扩容是有条件的,也就是代码(10)(11)的条件都不满足的时候。具体就是当前cells的元素个数小于当前机器CPU个数并且当前多个线程访问了cells中同一个元素,从而导致冲突使其中一个线程CAS失败时才会进行扩容操作。这里为何要涉及CPU个数呢?因为只有当每个CPU都运行一个线程时才会使多线程的效果最佳,也就是当cells数组元素个数与CPU个数一致时,每个Cell都使用一个CPU进行处理,这时性能才是最佳的。代码(12)中的扩容操作也是先通过CAS设置cellsBusy为1,然后才能进行扩容。假设CAS成功则执行代码(12.1)将容量扩充为之前的2倍,并复制Cell元素到扩容后数组。另外,扩容后cells数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素值目前还是null。这里就解决了问题4。

    在代码(7)(8)中,当前线程调用add方法并根据当前线程的随机数threadLocalRandomProbe和cells元素计算个数计算访问的Cell元素下标,然后如果发现对应下标值为null,则新增一个Cell元素到cells数组,并且将其添加到cells数组之前要竞争设置cellsBusy为1。

    问题5

    代码(13)对CAS失败的线程重新计算当前线程的随机值threadLocalRandomProbe,以减少下次访问cells元素时的冲突机会。这里回答了问题5。

小结

介绍完了JDK1.8中新增的LongAdder原子性操作类,该类通过内部cells数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争两,让多个线程可以同时对cells数组里面的元素进行并行的更新操作。另外,数组元素Cell使用@sun.misc.Contended注解进行修饰,这避免了cells数组内多个原子变量被放入同一个缓存行,也是避免了伪共享,这对性能也是一个提升。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容

  • 今天来讲一讲原子操作类,JUC包提供了一系列的原子性操作类,这些操作类使用的是CAS非阻塞算法实现的,相比于锁...
    Demo_zfs阅读 455评论 0 1
  • 基本原理和思想  Java有很多并发控制机制,比如说以AQS为基础的锁或者以CAS为原理的自旋锁。一般来说,CAS...
    Java耕耘者阅读 1,127评论 0 0
  • 2018年7月6日 星期五 晴 第「028」次感恩。 红茶,加柠檬水,加点蜂蜜。 泡一大壶,分成小罐,放进冰箱里,...
    子不能说阅读 110评论 0 0
  • 前一篇文章,其实并没有写什么违规或敏感的话题,还是被简书“和谐”了。也罢,我也觉得不该写私事,不符合引导“主流价值...
    伶俐姑娘阅读 353评论 3 16
  • 2019 1.15 奶奶走了 没有见上雪梅最后一面 奶奶啊 我也很痛啊 那么好的奶奶 就像外婆一样 慈祥的笑在天堂...
    TD1900娜娜阅读 222评论 0 0