高并发下的设计思想--分离热点

问题引入

在java体系中,为保证并发安全,我们通常会采用显示锁或者cas无锁编程。使用显示锁(包括sychorized,lock)来保证临界区的资源安全,是一种常用方式。而cas无锁编程则使用最原始的方式来保证临界资源的安全。如果对lock有过了解的同学,应该知道,lock锁本质就是通过对一个中间变量进行cas操作,如果cas操作成功,则表示加锁成功,如果cas失败,则进入aqs队列等待通知继续cas。不难发现,java中的lock实现本质上也是通过cas操作来获取锁,获取锁成功后,再操作临界资源。那么,我们当然可以直接对临界资源进行cas操作,省去中间加锁这一步,这就像如果我们租房子,我们可以找中介帮我们租,当然,我们也可以直接找到房东。那有同学就要问了,既然如此,jdk为啥还费这么大劲开发lock接口,这就要说到cas操作本身的劣势了。cas全名是 compare and swap,比较并交换,高并发情况下,可能会产生如下几个问题
1、aba问题
2、竞争过于激烈带来大量空自旋,导致cpu的急剧上升。

问题解决思路

aba问题相对来说好解决,加一个版本号就可以。但是cas导致大量空自旋的问题,就不那么好解决了。解决这个问题的思路有两个。
1、为什么会大量空自旋,是因为对临界资源的竞争太过激烈,我们可以采用分散临界资源的方式去减少对临界资源的竞争,从而降低空自旋,这也是计算机科学中最常用的方法论之一:以空间换时间。
2、既然自旋次数太多,我们就让其等待,不再自旋。
基于第二种想法,就产生了Lock锁的方式。我们对临界资源修改的时候,如果并发太高,竞争修改临界资源的线程太多,我们就将cas失败的线程放入到一个队列中,等到修改成功的线程完成后再通知队列中的后继线程去修改,而不是再无脑去不断自旋尝试cas。这样,就避免了直接cas带来的恶性自旋问题。但是采用这种方式,也有坏处,就是高并发下处理性能会有所降低,因为要去争抢锁,并且线程间的通信切换也会带来一定的消耗。这就像租房子的时候找中介,虽然可以省心一点,但是你要付中介费用。
基于此,我们再看第一种方式,jdk中的Atomic原子类,就可以直接通过cas操作保证临界资源的安全,但是存在我们开头所说的问题,所以jdk在1.8中就开发了一个LongAdder 来解决这个问题,用的方式就是我们所说的第一种方式分离热点,下面我们来具体对比一下AtomicLong 和 LongAdder 的性能表现,然后再分析LongAdder的设计思想、源码解析。

AtomicLong 与LongAdder的对比实验

public class CASTest {

    //线程数量:10
    private final int THREAD_COUNT = 10;

    //累加次数:分别测试 1000,10000,100000,1000000,10000000,100000000
    private final int TURNS = 1000;

    //cpu密集型线程池
    ExecutorService pool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),3, TimeUnit.SECONDS,new LinkedBlockingDeque<>());

    //原子对象
    private AtomicLong atomicLong = new AtomicLong(0);

    //LongAdder原子对象
    LongAdder longAdder = new LongAdder();

    /**
     * AtomicLong性能测试
     */
    public void testAtomicLong() throws InterruptedException {

        CountDownLatch doneSignal = new CountDownLatch(THREAD_COUNT);
        long start = System.currentTimeMillis();
        for (int i=0 ;i< THREAD_COUNT; i++) {
            pool.submit(() -> {
                for (int j=0; j<TURNS; j++) {
                    atomicLong.incrementAndGet();
                }
                doneSignal.countDown();;
            });
        }
        doneSignal.await();
        float time = (System.currentTimeMillis() - start)/1000F;
        System.out.println("AtomicLong 测试累加结果: "+atomicLong.get()+ " 耗费时间:"+time );
    }


    /**
     * LongAdder测试类
     */
    public void testLongAdder() throws InterruptedException {
        CountDownLatch doneSignal = new CountDownLatch(THREAD_COUNT);
        long start = System.currentTimeMillis();
        for (int i=0 ;i< THREAD_COUNT; i++) {
            pool.submit(() -> {
                for (int j=0; j<TURNS; j++) {
                    longAdder.increment();
                }
                doneSignal.countDown();;
            });
        }
        doneSignal.await();
        float time = (System.currentTimeMillis() - start)/1000F;
        System.out.println("LongAdder 测试累加结果: "+longAdder.longValue()+ " 耗费时间:"+time );
    }

    public static void main(String[] args) throws InterruptedException {

        CASTest casTest = new CASTest();
        casTest.testAtomicLong();
        casTest.testLongAdder();
    }
}

以上代码是对AtomicLong和LongAdder的性能比较,在启用10个线程的情况下,我们分别对临界资源进行1000,1W,10w,100w,1000w,一亿次累计操作,然后比较AtomicLong和LongAdder所耗费的时间。通过分析下图这个结果,我们可以明显看出,在cas竞争不大的情况下,AtomicLong性能是要高一些的,但是随着cas竞争越来越激烈,LongAdder的优势会越来越明显,在每个线程累加1亿次的情况下,LongAdder的性能接近是AtomicLong 的7倍(而且这个优势可能随着硬件资源的升高会逐渐放大)


AtomicLong和LongAdder的性能比较

LongAdder的原理与分析

接下来我们来看一下LongAdder的原理

原理概述:

AtomicLong 使用内部变量 value 保存着实际的 long 值,所有的操作都是针对该 value 变量进行。也就是说,高并发环境下,value 变量其实是一个热点,也就是 N 个线程竞争一个热点。重试线程越多,意味着 CAS 的失败几率更高,CAS 失败几率就越高,从而进入恶性 CAS 空自旋状态。
LongAdder 的基本思路就是分散热点,将 value 值分散到一个数组中,不同线程会命中到数组的不同槽(元素)中,各个线程只对自己槽中的那个值进行 CAS 操作。这样热点就被分散了,冲突的概率就小很多。当需要获取元素值时,只需要将所有数组元素值累加即可。

原理详解

LongAdder 继承于 Striped64 类,base 值和 cells 数组都在 Striped64 类定义。基类 Striped64 内部三个重要的成员如下:

/**
* 成员一:存放 Cell 的 hash 表,大小为 2 的幂。
*/
transient volatile Cell[] cells;
/**
* 成员二:基础值,
* 1. 在没有竞争时会更新这个值;
* 2. 在 cells 初始化时 cells 不可用,也会尝试将通过 cas 操作值累加到 base。
*/
transient volatile long base;
/**
* 自旋锁,通过 CAS 操作加锁,为 0 表示 cells 数组没有处于创建、扩容阶段
* 为 1 用于表示正在创建或者扩展 Cell 数组,不能进行新 Cell 元素的设置操作。
*/
transient volatile int cellsBusy;

1。在cas竞争不激烈,没有发生cas失败的情况下,线程会直接更新case的值。这种情况下本质上和AtmicLong是一样的。
2、但是一旦竞争加剧,发生了cas失败的情况情况下,线程首先会尝试修改cellsBuzy值,修改成功表示可以创建或者扩容数组
3、修改失败的话,说明正在有其他线程去创建或者扩容数组,则此时去直接修改base值。
4、在创建了cell数组的情况下,会根据线程id对数组的长度取模,然后映射到数组下标某一具体的cell,再去修改这个cell的值,这样就把原来所有线程对base值的竞争修改,分散到了对数组中的每一个cell数组元素值的修改,大大减少了并发竞争,从而降低cas恶性自旋。
5、同时需要注意的一个点是,cell数组的扩容时机以及它的上限大小。在cell数组已经被创建并且对应的数组下标位置已经有值的情况下,某个线程去修改cell元素的值,如果修改失败的话,说明当前cell数组长度可能不够,尽管已经分散了,但是映射到具体某一位置后修改依然失败,这时就需要去扩容数组的长度,扩容一次是原来的两倍。当然,也不可能无限扩容,上限是达到cpu的核心数,因为可以并发的线程数最多就是cpu的核心数。
6、在获取元素值时,直接累加base值和cell数组中所有元素值即可。

总结:

高并发情况下对临界资源的分离热点(分段)是最常用的手段之一,类似的案例还有很多种,比如jdk1.8之前的版本concurrentHashMap也是通过分段锁来降低竞争,比如本地缓存组件caffeine中的StripedBuffer并发写入队列中的场景。具体业务应用层面,比如我们在优化redis大key时,常用的方法也是将key分散;比如商品库存如果是存在redis中,在并发更新redis库存时,如果redis达到性能瓶颈,我们也可以将redis库存key分散成多个库存key 等等...掌握这一思想,在应对高并发的场景时,大有裨益。

附录LongAdder源码解析

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        /**
         * :cells 数组不为 null,说明存在争用;在不存在争用的时候,cells 数组
         * 一定为 null,一旦对 base 的 cas 操作失败,才会初始化 cells 数组
         */
        /**
         * 如果 cells 数组为 null,表示之前不存在争用,并且此次 casBase 执行
         * 成功,则表示基于 base 成员累加成功,add 方法直接返回;如果 casBase 方法执行失败,
         * 说明产生了第一次争用冲突,需要对 cells 数组初始化,此时,即将进入内层 if 块
         */
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;

            /**
             * cells 没有初始化
             */
            if (as == null || (m = as.length - 1) < 0 ||
                    /**
                     * 指当前线程 hash 到的 cells 数组的元素位置的 Cell 对象为空,意思是
                     * 还没有其他线程在同一个位置做过累加操作
                     */
                (a = as[getProbe() & m]) == null ||
                    /**
                     * 指当前线程在 cells 数组的被 hash 到的元素位置的 Cell 对象不为空,
                     * 然后在该 Cell 对象上进行 CAS 设置其值为 v+x(x 为该 Cell 需要累加的值),但是 CAS
                     * 操作失败,表示存在争用
                     */
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }




final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        //扩容意向,collide=true 可以扩容,collide=false 不可扩容
        boolean collide = false;                // True if last slot nonempty
        //自旋,一直到操作成功
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //表示 cells 已经初始化了,当前线程应该将数据写入到对应的 cell 中
            if ((as = cells) != null && (n = as.length) > 0) {
                //true 表示下标位置的 cell 为 null,需要创建 new Cell
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                /**
                 * 当前下标位置的cell已经有值,需要进行cas修改cell的值
                 *(如果修改失败了,说明数组当前的容量可能太小,导致cas失败,后续有扩容意向)
                 */
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //如果数组长度大于cpu核心数,则不再扩容
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    //设置扩容意向为true
                    collide = true;
                //真正扩容逻辑,容量设置为原谅的两倍
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //cells 还未初始化(as 为 null),并且 cellsBusy 加锁成功
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        //初始化长度为2
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            /**
             * CASE3:当前线程 cellsBusy 加锁失败,表示其他线程正在初始化 cells
             * 所以当前线程将值累加到 base,
             */
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }



    /**
     * LongAdder sum的值等于base值 加上所有cell中的值
     * @return
     */
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容