Condition的await、signal源码分析

本文从源码角度,以AbstractQueuedSynchronizer.ConditionObject的await、signal、signalAll为入口,对基于ReentrantLock的线程间通信的原理进行介绍,其中涉及到的与ReentrantLock相关的方法,在ReentrantLock一文中已经介绍过,这里不再说明。
源码注释中同步队列指ReentrantLock维护的队列,条件队列指ConditionObject维护的队列,对于内部数据结构的组织形式,这里不进行介绍。
由于是多线程并发,所以会有多种不同的情况,源码中针对每个条件,仅给出一两种情形,留作参考。

类名简写说明:

  • CO:ConditionObject
  • AQS:AbstractQueuedSynchronizer

1. AQS.CO.await()

(1)AQS.CO.await()

    public final void await() throws InterruptedException {
        // 先判断当前线程是否被中断,被中断则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 将当前线程对应的节点加入条件队列
        Node node = addConditionWaiter();
        // 完全释放锁,因为可重入,所以saveState可能大于1
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 若不在同步队列中,被唤醒后会再次进入while中被park
        while (!isOnSyncQueue(node)) {
            // 挂起当前线程
            LockSupport.park(this);
            // checkInterruptWhileWaiting中会判断当前线程是否是被中断唤醒的
            // 返回值非0表示是被中断唤醒的,会通过break跳出while
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 条件1:
        //   为true的情形:
        //     当前线程在上面while中park时被中断,在acquireQueued中park时再次被中断时条件1为true
        // 条件2:
        //   条件2为true,即interruptMode是0或REINTERRUPT,为
        //   REINTERRUPT的情况详见下面(5)处,为0的情况尚不明确
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            // 从上面举的情形中来看,到这里时interruptMode就是REINTERRUPT,
            // 这里难道是Doug Lea老先生的一个编码缺陷?
            interruptMode = REINTERRUPT;
        // 该条件成立的情形:
        //   假设有三个线程thread0、thread1、thread2,三个线程依次调用AQS.CO.await(),之后三者均
        //   在上面while中被park,此时,外部线程中断thread0,thread0会执行到这里,此时条件为true
        if (node.nextWaiter != null) 
            // 断开条件队列中所有不是Node.CONDITION状态的节点(代码不再贴出)
            unlinkCancelledWaiters();
        // 该条件成立时,说明当前线程被中断,interruptMode为THROW_IE或REINTERRUPT
        if (interruptMode != 0)
            // 方法中会根据interruptMode的值做相应处理(代码不再贴出)
            reportInterruptAfterWait(interruptMode);
    }

(2)AQS.CO.addConditionWaiter

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 条件2:
        //   为true时的情形:
        //     情形1:假设有两个线程thread0、thread1,初始时,thread0在未持有锁的情况下调用
        //            AQS.CO.await(),当thread0执行到AQS.fullyRelease中时会将其对应节点
        //            的waitStatus字段设置为取消状态(见下面(4)处),之后持有锁的thread1调用
        //            AQS.CO.await(),会执行到这里,这里t.waitStatus为1
        //     情形2:假设有两个线程thread0、thread1,初始时,thread0持有锁,之后调用AQS.CO.await()
        //            释放锁并阻塞在LockSupport.park(this)处,之后外部线程中断thread0,thread0被唤
        //            醒后会执行到AQS.transferAfterCancelledWait里的if处将t.waitStatus设置为0,之
        //            后thread1获取到锁,执行到这里时,t.waitStatus为0,条件2为true
        if (t != null && t.waitStatus != Node.CONDITION) {
            // 断开条件队列中所有不是Node.CONDITION状态的节点(代码不再贴出)
            unlinkCancelledWaiters();
            // 更新t。经过unlinkCancelledWaiters,lastWaiter可能已经改变
            t = lastWaiter;
        }
        // 创建新的状态为Node.CONDITION的节点,并连接到同步队列中
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

(3)AQS.fullyRelease

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 获取AQS.state
            // 因为锁可重入,所以saveState可能大于1
            int savedState = getState();
            // 传入saveState而不是1,因此该方法名叫"fully"Release
            // 该方法在ReentrantLock的1.2(2)处已经说明,这里不再介绍
            if (release(savedState)) {
                failed = false;
                // 返回saveState,再次唤醒该线程时会用到
                return savedState;
            } else { // 可以说不会执行到该分支
                throw new IllegalMonitorStateException();
            }
        } finally {
            // 当未持有锁的线程调用await方法时,release(savedState)中会抛异常,此时failed为true
            if (failed)
                // 设置为取消状态
                node.waitStatus = Node.CANCELLED;
        }
    }

(4)AQS.isOnSyncQueue

    final boolean isOnSyncQueue(Node node) {
        // 一般情况下,初次进入该方法,条件1为true,该方法返回false,
        // 下面对初次进入该方法时的一些特殊情形进行介绍:
        // a.情形:
        //   假设有两个线程thread0、thread1,初始时,thread0持有锁,调用了AQS.CO.await(),从
        //   while (!isOnSyncQueue(node))处到这里(还未执行),thread1获取到锁,调用了AQS.CO.signal,
        //   一直调到AQS.transferForSignal,在方法里的第一个if执行后,会将node.waitStatus设置为0,
        //   使得这里的条件1为false,因为thread1还未执行AQS.transferForSignal的Node p=enq(node),
        //   因此这里node.prev为null,条件2为true。
        // b.接着上面的情形继续延伸:
        //   若thread0仍未执行该if语句,thread1执行完AQS.transferForSignal
        //   的Node p=enq(node),则条件2变为false
        // c.继续延伸:
        //   若thread0仍未执行该if语句,线程thread2与来竞争锁,因为thread0此时仍持有锁,
        //   因此,thread2会被加入到同步队列中,经过一些列步骤后,会被park,期间会将
        //   thread1对应节点的waitStatus设置为-1,条件1仍为false
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false; // 返回false,表示node不在同步队列中
        // 条件为false的情形:
        //   上面a、b情形不变,将c处的情形改为:thread2执行到到AQS.addWaiter的pred.next = node;
        //   处时(该语句还未执行),thread0执行到这里,此时该条件为false
        // 条件为true的情形:
        //   thread0执行到这里时,thread2已经执行完AQS.addWaiter的pred.next = node;处,此时该条件成立
        if (node.next != null) // If has successor, it must be on queue
            return true;
        
        // 到这里说明node.waitStatus不为Node.CONDITION且node.prev不为null
        
        // findNodeFromTail中会从tail向前遍历同步队列,寻找node是否已在队列中,存在则返回true,否则返回false。
        // 情形:
        //   上面a情形不变,将b处的情形改为:thread1执行到AQS.enq的if (compareAndSetTail(t, node))
        //   (该语句还未执行),或执行了但由于其他线程的竞争导致执行失败了,findNodeFromTail
        //   中会返回false;若执行了且成功了,则findNodeFromTail会返回true
        //   (多数情况下node都在同步队列的尾部附近)
        return findNodeFromTail(node);
    }

(5)AQS.CO.checkInterruptWhileWaiting

    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            // 被中断
            // 返回THROW_IE表示当前线程在其他线程调用signal前被中断
            // 返回REINTERRUPT表示当前线程在其他线程调用signal后被中断
            // 具体的分界点就是node.waitStatus的值,若其值为Node.CONDITION,
            // 则是signal前被中断,否则在signal后被中断
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }   
(6)AQS.transferAfterCancelledWait 
    final boolean transferAfterCancelledWait(Node node) {
        // 该条件为true的情形:
        //   假设仅有thread0,thread0持有锁后调用AQS.CO.await()被park,
        //   之后被外部线程中断,会执行到这里,此时条件为true
        // 该条件为false的情形:
        //   假设有两个线程thread0、thread1,thread0,thread0持有锁后调用AQS.CO.await()被park,thread1
        //   获取到锁后调用AQS.CO.signal,之后会执行到AQS.transferForSignal的第一个if处,该if语句执行
        //   完后,这里node.waitStatus被修改为0,所以thread0执行这里的if语句时会失败
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node); // 入队
            return true;
        }
        // 到这里说明其他线程调用了AQS.CO.signal,将当前线程对应的节点加入同步队列,这里自选等待入队完成
        while (!isOnSyncQueue(node))
            Thread.yield(); // 主动让出当前线程的CPU时间片
        return false;
    }   

2. AQS.CO.signal

(1)AQS.CO.signal

    public final void signal() {
        // 持有锁的线程不是当前线程(即外部某个线程调用了该方法)则抛异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 获取条件队列中的第一个节点
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

(2)AQS.CO.doSignal

    private void doSignal(Node first) {
        // 遍历条件队列,仅唤醒第一个符合条件的线程
        do {
            // 更新firstWaiter
            // firstWaiter为null,说明已到队列尾部
            if ( (firstWaiter = first.nextWaiter) == null)
                // 将lastWaiter设置为null
                lastWaiter = null;
            // 将first与其后继节点断开   
            first.nextWaiter = null;
            // 条件1:只要一个线程入队成功,transferForSignal返回true,就会终止该循环
            // 条件2:用于判断是否到队列尾部
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }

(3)AQS.transferForSignal

    final boolean transferForSignal(Node node) {

        // 条件为true的情形(注意前面的"!"): 
        //   情形1:假设有两个线程thread0、thread1,初始时,thread0在未持有锁的情况下调用
        //          AQS.CO.await(),当thread0执行到AQS.fullyRelease中时会将其对应节点的
        //          waitStatus字段设置为取消状态(见上面1.(4)处),之后持有锁的thread1调用
        //          AQS.CO.signal就会执行到这里,此时条件为true
        //   情形2:假设有两个线程thread0、thread1,初始时,持有锁的thread0调用AQS.CO.await(),
        //          执行到该方法的while里被park,之后外部线程中断thread0,thread0被唤醒后会执
        //          行到AQS.transferAfterCancelledWait的第一个if处将node.waitStatus设置为0,
        //          之后thread1获取到锁,调用AQS.CO.signal执行到这里,条件就为true
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 入队
        // 注意p是node的前驱节点
        Node p = enq(node);
        int ws = p.waitStatus;
        // 条件1:
        //   为true时的情形:
        //     假设有三个线程thread0、thread1、thread2,初始时,持有锁的thread0调用AQS.CO.await(),执行到该方法
        //     的while里被park,之后thread1获取到锁,调用AQS.CO.signal执行到上面Node p = enq(node);处(该行还未执行),
        //     之后thread2通过ReentrantLock.lockInterruptibly获取锁,因为thread1仍持有锁,因此thread2最终会在
        //     AQS.parkAndCheckInterrupt中被park,之后外部线程中断thread2,当thread2执行完AQS.cancelAcquire的
        //     node.waitStatus = Node.CANCELLED;后,thread1从上面Node p = enq(node);处执行到这里,此时ws为1
        // 条件2:
        //   为true时的情形(注意前面的"!"):
        //     对于条件1为true时的情形,在thread2被park后,若thread1先执行完ws > 0(条件2还未执行)(此时ws为0,
        //     条件1不成立),此时thread2,被中断后在AQS.cancelAcquire的node.waitStatus = Node.CANCELLED;处
        //     将p.waitStatus设置为1,此时条件2就为true
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // node.thread会从AQS.CO.await里的park处被唤醒,因为已经在同步队列中了,因此会跳出while,进
            // 入AQS.acquireQueued中,若此时p前面仍有节点等待获取锁或p前面没有节点了但调用AQS.CO.signal
            // 的线程仍未释放锁,node.thread在尝试几次后最终仍会在AQS.parkAndCheckInterrupt中被park
            LockSupport.unpark(node.thread);
        return true;
    }

3. AQS.CO.signalAll

AQS.CO.signalAll与AQS.CO.signal类似,区别是signalAll会将所有节点加入同步队列,下面仅对AQS.CO.doSignalAll方法进行介绍:

    private void doSignalAll(Node first) {
        // 将条件队列的头节点和尾节点都置为null
        lastWaiter = firstWaiter = null;
        // 遍历条件队列,依次唤醒所有线程
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容