J.U.C|AQS共享式源码分析

一、写在前面

上篇给大家聊了独占式的源码,具体参见《J.U.C|AQS独占式源码分析》

这一章我们继续在AQS的源码世界中遨游,解读共享式同步状态的获取和释放。

二、什么是共享式

共享式与独占式唯一的区别是在于同一时刻可以有多个线程获取到同步状态。

我们以读写锁为例来看两者,一个线程在对一个资源文件进行读操作时,那么这一时刻对于文件的写操作均被阻塞,而其它线程的读操作可以同时进行。
当写操作要求对资源独占操作,而读操作可以是共享的,两种不同的操作对同一资源进行操作会是什么样的?看下图

共享式-读写锁-读.jpg

共享式访问资源,其他共享时均被允许,而独占式被阻塞。


共享式-读写锁-写.jpg

独占式访问资源时,其它访问均被阻塞。

通过读写锁给大家一起温故下独占式和共享式概念,上一节我们已经聊过独占式,本章我们主要聊共享式。

主要讲解方法

  • protected int tryAcquireShared(int arg);共享式获取同步状态,返回值 >= 0 表示获取成功,反之则失败。
  • protected boolean tryReleaseShared(int arg): 共享式释放同步状态。

三、核心方法分析


3.1 同步状态的获取

public final void acquireShared(int arg)

共享式获取同步状态的顶级入口,如果当前线程为获取到同步状态,将会加入到同步队列中等待,与独占式唯一的区别是在于同一时刻可以有多个线程获取到同步状态

方法源码

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

方法函数解析

  • tryAcquireShared(arg):获取同步状态,返回值大于等于0表示获取成功,否则失败。
  • doAcquireShared(arg):共享式获取共享状态,包含构建节点,加入队列等待,唤醒节点等操作。

源码分析

同步器的 acquireShared 和 doAcquireShared 方法

//请求共享锁的入口
public final void acquireShared(int arg) {
        // 当state != 0 并且tryAcquireShared(arg) < 0 时才去才获取资源
        if (tryAcquireShared(arg) < 0)
            // 获取锁
            doAcquireShared(arg);
    }
// 以共享不可中断模式获取锁
private void doAcquireShared(int arg) {
        // 将当前线程一共享方式构建成 node 节点并将其加入到同步队列的尾部。这里addWaiter(Node.SHARED)操作和独占式基本一样,
        final Node node = addWaiter(Node.SHARED);
        // 是否成功标记
        boolean failed = true;
        try {
            // 等待过程是否被中断标记
            boolean interrupted = false;
            自旋
            for (;;) {
                // 获取当前节点的前驱节点
                final Node p = node.predecessor();
                // 判断前驱节点是否是head节点,也就是看自己是不是老二节点
                if (p == head) {
                    // 如果自己是老二节点,尝试获取资源锁,返回三种状态
                    // state < 0 : 表示获取资源失败
                    // state = 0: 表示当前正好线程获取到资源, 此时不需要进行向后继节点传播。
                    // state > 0: 表示当前线程获取资源锁后,还有多余的资源,需要向后继节点继续传播,获取资源。 
                    int r = tryAcquireShared(arg);
                    // 获取资源成功
                    if (r >= 0) {
                        // 当前节点线程获取资源成功后,对后继节点进行逻辑操作
                        setHeadAndPropagate(node, r);
                        // setHeadAndPropagate(node, r) 已经对node.prev = null,在这有对p.next = null; 等待GC进行垃圾收集。
                        p.next = null; // help GC
                        // 如果等待过程被中断了, 将中断给补上。
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在acquireShared(int arg)方法中,同步器调用tryAcquireShared(arg)方法获取同步状态,返回同步状态有两种。

当同步状态大于等于0时: 表示可以获取到同步状态,退出自旋,在doAcquireShared(int arg)方法中可以看到节点获取资源退出自旋的条件就是大于等于0

小于0会加入同步队列中等待被唤醒。

addWaiter和enq方法

// 创建节点,并将节点加入到同步队列尾部中。
 private Node addWaiter(Node mode) {
        // 以共享方式为线程构建Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速加入到队列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // CAS保证原子操作,将node节点加入到队列尾部
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 快速加入失败,走 enq(node)方法
        enq(node);
        return node;
}
//以自旋的方式,将node节点加入到队列的尾部
private Node enq(final Node node) {
        // 自旋
        for (;;) {
            // 获取尾部节点
            Node t = tail;
            // 如果tail节点为空, 说明同步队列还没初始化,必须先进行初始化
            if (t == null) { // Must initialize
                // CAS保证原子操作, 新建一个空 node 节点并将其设置为head节点
                if (compareAndSetHead(new Node()))
                    // 设置成功并将tail也指向该节点
                    tail = head;
            } else {
                // 将node节点加入到队列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这两个方法和独占式的基本相同,注释中都表明了,在这就不多做解释了。

获取资源成功后对后继节点的操作setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {
        // 记录老的head节点,以便核对
        Node h = head; // Record old head for check below
        // 将node 设置成head节点
        setHead(node);
        // 这里表示: 如果资源足够(propagate > 0)或者旧头节点为空(h == null)或者旧节点的waitStatus为 SIGNAL(-1) 或者 PROPAGATE(-3)(h.waitStatus < 0)
        // 或者当前head节点不为空或者waitStatus为SIGNAL(-1) 或者 PROPAGATE(-3),此时需要继续唤醒后继节点来尝试获取资源。
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 当前node节点的后继节点
            Node s = node.next;
            //如果后节点为空或者属于共享节点
            if (s == null || s.isShared())
                // 继续尝试获取资源
                doReleaseShared();
        }
    }

首先将当前节点设置为head节点 setHead(node), 其次根据条件看是否对后继节点继续唤醒。

获取资源失败进行阻塞等待unpark

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱节点的等待状态
        int ws = pred.waitStatus;
        // 如果等待状态已经为SIGNAL(表示当前当前节点的后继节点处于等待状态,如果当前节点释放了同步状态或者被中断, 则会唤醒后继节点)
        if (ws == Node.SIGNAL)
            // 直接返回,表示可以安心的去休息了
            return true;
        // 如果前驱的节点的状态 ws > 0(表示该节点已经被取消或者中断,也就是成无效节点,需要从同步队列中取消的)
        if (ws > 0) {
            // 循环往前需寻找,知道寻找到一个有效的安全点(一个等待状态<= 0 的节点,排在它后面)
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 注意这一波操作后,获奖取消的节点全部变成GC可回收的废弃链。
            pred.next = node;
        } else {
            //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它获取资源后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

 private final boolean parkAndCheckInterrupt() {
        // 调用park方法使当前节点的线程进入waiting
        LockSupport.park(this);    
        //返回线程中断状态
        return Thread.interrupted();
    }

这两个方法和独占式基本相同。

接着看doReleaseShared 这个比较复杂

private void doReleaseShared() {
        //注意,这里的头结点已经是上面新设定的头结点了,从这里可以看出,如果propagate=0,
        //不会进入doReleaseShared方法里面,那就有共享式变成了独占式
        for (;;) { // 死循环以防在执行此操作时添加新节点:退出条件 h == head
            Node h = head;
            // 前提条件,当前的头节点不为空,并且不是尾节点
            if (h != null && h != tail) {
                // 当前头节点的等待状态
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 如果当前节点的状态为SIGNAL,则利用CAS将其状态设置为0(也就是初始状态)
                    //这里不直接设为Node.PROPAGATE,是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
                    //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases 设置失败,重新循环
                    // 唤醒后继节点
                    unparkSuccessor(h);
                }
                // 如果等待状态不为0 则利用CAS将其状态设置为PROPAGATE ,以确保在释放资源时能够继续通知后继节点。
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed 如果head 期间发生了改变,则需要从新循坏
                break;
        }
    }
private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        // 在此再次判断当前头节点的的状态,如果小于0 将设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //获取后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            //如果后继节点为空或者等待状态大于0 直接放弃。
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                // 循环从尾部往前寻找下一个等待状态不大于0的节点
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒该节点的线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }

最后一步释放资源就比较简单了。

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

总结

在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程会加入到队列中并进行自旋,出列的(或者停止自旋)的条件时前驱节点为头节点并且成功获取了同步状态。在释放同步状态时,调用Release方法释放同步状态,然后唤醒头节点的后继节点。

共享式方式在唤醒后继节点获得资源后会判断当前资源是否还有多余的,如果有会继续唤醒下一个节点。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345