03_Condition_2_实现类

一:类ConditionObject,基础操作和内部结构

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. 
     * 这里也是用的AQS中的Node,这里只是用到Node中的nextWaiter,不是pre和next,所以是一个单项链表。
     * */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    /**
     * Creates a new {@code ConditionObject} instance.
     */
    public ConditionObject() { }

    /**
     * Adds a new waiter to wait queue.
     * 往等待队列中添加一个新的等待者。
     * @return its new wait node
     */
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out. 如果最后一个等待着已经被取消,则清除掉。
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //这里就将waitStatus中的CONDITION状态给用到了。
       // Node.CONDITION的注释中有说明,该状态表示节点在条件队列中。
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

    /**
     * Unlinks cancelled waiter nodes from condition queue.
     * Called only while holding lock. This is called when
     * cancellation occurred during condition wait, and upon
     * insertion of a new waiter when lastWaiter is seen to have
     * been cancelled. This method is needed to avoid garbage
     * retention in the absence of signals. So even though it may
     * require a full traversal, it comes into play only when
     * timeouts or cancellations occur in the absence of
     * signals. It traverses all nodes rather than stopping at a
     * particular target to unlink all pointers to garbage nodes
     * without requiring many re-traversals during cancellation
     * storms.
     * 从条件队列中将已取消的等待节点移除(取消链接)。只有在持有锁的时候调用。
     * 当在条件等待期间发生取消时,以及在插入新的等待者期间发现lastWaiter是已取消的时候,调用此函数。
     * 需要这种方法来避免在没有信号的情况下垃圾保留。
     * 因此,即使它可能需要一个完整的遍历,它也只有在没有信号的情况下发生超时或取消才会起作用。
     * 它遍历所有节点,而不是在特定目标处停止,以取消所有指向垃圾节点的指针的链接,而无需在取消风暴期间多次重新遍历。
     * 就是从头到位遍历,将所有取消的节点剔除。
     */
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
}

总结:

  1. 此类中也是使用Node来维护了一个单向链表,维护所有等待该Condition的线程队列,和AQS中的同步队列不是一个队列,
    这两个队列是会交互的。
  2. 此类中的Node的waitStatus都是Condition,表示等待状态,表示该节点处于等待队列中。
  3. 两个基本的操作,添加新节点和删除取消的节点。

二:await逻辑

public class ConditionObject implements Condition, java.io.Serializable {
    /**
     * Implements interruptible condition wait.
     * 实现可中断的条件等待。(下面的注释已经大体描述清楚每一步的逻辑了)
     * <ol>
     * <li> If current thread is interrupted, throw InterruptedException.
     *      如果当前线程已经被中断,则抛出InterruptedException。
     *      
     * <li> Save lock state returned by {@link #getState}.
     *      getState方法返回保存的锁状态。
     *      
     * <li> Invoke {@link #release} with saved state as argument,
     *      throwing IllegalMonitorStateException if it fails.
     *      通过保存状态作为参数来调用release方法,如果失败则抛出IllegalMonitorStateException。
     *      
     * <li> Block until signalled or interrupted.
     *      阻塞至被信号通知或被中断。
     *      
     * <li> Reacquire by invoking specialized version of
     *      {@link #acquire} with saved state as argument.
     *      通过调用acquire的专用版本并将保存的状态作为参数来重新获取
     *      
     * <li> If interrupted while blocked in step 4, throw InterruptedException.
     *      如果在步骤4中阻塞时被中断,则抛出InterruptedException。
     * </ol>
     */
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter(); //创建一个condition类型的Node并入链
        int savedState = fullyRelease(node);//完全释放并返回当时的state
        int interruptMode = 0;
        //判断是否在AQS的同步队列中,不在表示还未signal,在了表示已经被signal了
        while (!isOnSyncQueue(node)) {
            //1. fullyRelease后node不在AQS的同步队列中了,所以会立即进入该方法,后续该线程被park
            //其他线程调用signal时会将此线程unpark,又会将此线程添加到AQS的同步队列中去,结束wile循环。后面在看signal。
            //2. 会不会此时已经在AQS的同步队列中了呢,应该也是会的,就是此线程刚fullyRelease,
            // 其它线程signal的时候又将该node给加到了同步队列中,此时就不park了,直接往后执行就可以了。
            //3. 如果park之前被中断了,那么此处的park会不起作用,直接往下继续执行。
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                //阻塞的过程中被中断了,就直接break出来
                //checkInterruptWhileWaiting用来检测中断的时机并将node加到AQS队列中,后面再细看。
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            //acquireQueued需要再次获取锁资源
            // 这里会存在获取不到锁资源的情况吗?不会,即使上面不是因为signal唤醒而unpark,而是因为中断而唤醒,从而执行到这里,
            // acquireQueued也必须获取到锁,获取不要会继续给park住,然后排队等待获取,
            // 而且acquireQueued方法是不支持中断的,只能等到获取成功才会继续执行。而且await之前执行了lock操作,之后还会执行unlock操作,
            //如果说获取不到那不就出错了嘛。
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled 清理被取消的节点
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);//将中断状态给报告出去
    }

    /**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * 使用当前state值调用release;返回保存的state。失败时取消节点并抛出异常。
     * 
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            //为什么不是release(1)呢?因为如果是可重入的,比如说thread1锁了2次,这个saveState为2,
            //release(1)不会完全释放当前锁,所以这个方法叫做fullyRelease,完全释放。
            //为什么又要返回这个savedState呢? thread1锁了2次,await的时候会通过该方法释放掉,其它线程会再次获得锁,
            //那么等thread1被signal时,会重新acquire(savedState)获取锁,并将state的设置到正确状态,
            // thread1 lock了两次,肯定会unlock两次,如果state为1,第二次unlock时就会报错。
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

    /** Mode meaning to reinterrupt on exit from wait */
    //该模式意味着退出等待时重新中断,发出信号后被中断
    private static final int REINTERRUPT =  1;
    /** Mode meaning to throw InterruptedException on exit from wait */
    //模式意味着退出等待时抛出InterruptedException,发出信号前被中断
    private static final int THROW_IE    = -1;
    
    /**
     * Checks for interrupt, returning THROW_IE if interrupted
     * before signalled, REINTERRUPT if after signalled, or
     * 0 if not interrupted.
     * 检查中断,
     *  如果在发出信号前被中断,则返回THROW_IE,
     *  发出信号后被中断则返回REINTERRUPT,
     *  没有被中断则返回0.
     */
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
    }

    /**
     * Transfers node, if necessary, to sync queue after a cancelled wait.
     * Returns true if thread was cancelled before being signalled.
     * 如有必要,在取消等待后将节点传输到同步队列。如果线程在发出信号之前被取消,则返回true。
     *
     * @param node the node
     * @return true if cancelled before the node was signalled
     */
    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //场景:一个线程在await,其它线程一直没有对其进行signal,然后该线程又被interrupt了。
            // await进行park前,已经将锁给释放掉了,中断后会从park中醒过来继续acquireQueued获取锁,
            //acquireQueued操作的是同步队列中的node,所以需要将node加到AQS的同步队列中去。
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         * 如果我们输给了一个signal(),直到它完成enq()否则我们无法继续。
         * 在不完全传输过程中的取消既罕见又短暂,所以只需旋转即可。
         * 
         * 啥意思呢? signal的第一步就会将node的状态从CONDITION改为0,那么该方法上面的CAS操作将会失败,
         * 出现的原因是因为interrupt和signal方法几乎同时执行,但是signal的CAS操作比此方法的CAS操作快了一步。
         */
        while (!isOnSyncQueue(node))
            // 循环一下等待node在signal中被enq到同步队列中去。
            Thread.yield();
        return false;
    }

    /**
     * Throws InterruptedException, reinterrupts current thread, or
     * does nothing, depending on mode.
     * 基于mode 来抛出InterruptedException、重新中断当前线程或者啥都不做。
     */
    private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
}

总结:

  1. 正常的等待逻辑比较直观,分为以下几步(一个线程await,一个线程signal):

    • 1.0 lock()获取锁、await()等待。
    • 1.1 addConditionWaiter创建一个Condition状态的节点并加入到队列。
    • 1.2 fullyRelease 将锁释放,并保存好AQS中的状态(savedState)。
    • 1.3 park线程,等待其它线程signal通知。
    • 1.4 其它线程调用signal,以及unlock,等待线程从park中唤醒
    • 1.5 acquireQueued 重新获取锁,并将1.2中的savedState状态从新设置到AQS的state字段。
    • 1.6 执行业务逻辑,最后释放锁 unlock()。
  2. 中断发生,并且发生在signal之前(一个线程await,一个线程signal):

    • 2.1 发生在park之前,那么park不起作用,直接往后执行。
    • 2.2 发生在park之后,线程从park中唤醒。
    • 2.3 transferAfterCancelledWait 将node的状态成功的从CONDITION改为0,并且将node添加到AQS的同步队列。
    • 2.4 上一步可能将node的状态从CONDITION改为0失败,表示signal已经发生了,这时算作中断发生在signal之后。
    • 2.5 acquireQueued 重新获取锁,并将savedState状态重新设置到AQS的state字段。
    • 2.6 reportInterruptAfterWait 将抛出InterruptedException。
  3. 中断发生,并且发生在signal之后

    • 3.1 其它线程调用signal,以及unlock,等待线程从park中唤醒
    • 3.2 acquireQueued 重新获取锁,并将savedState状态重新设置到AQS的state字段。
    • 3.3 reportInterruptAfterWait 将重新执行Thread.interrupt().
  4. 多个线程await,被多次signal,每次signal则唤醒一个await的线程。

  5. 多个线程await,被signalAll同时唤醒,其实应该说是将等待队列中的线程逐个唤醒。

三:signal逻辑

public class ConditionObject implements Condition, java.io.Serializable {
    /**
     * Moves the longest-waiting thread, if one exists, from the
     * wait queue for this condition to the wait queue for the
     * owning lock.
     * 将等待时间最长的线程(如果存在)从该条件的等待队列移动到拥有锁的等待队列。
     * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
     *         returns {@code false}
     */
    public final void signal() {
        if (!isHeldExclusively())
            //如果不是当前线程获得的锁,则抛出异常
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;//firstWaiter表示等待时间最长的节点。
        if (first != null)
            doSignal(first);
    }

    /**
     * Removes and transfers nodes until hit non-cancelled one or
     * null. Split out from signal in part to encourage compilers
     * to inline the case of no waiters.
     * 删除和传输节点,直到命中未取消的一个node或null。
     * 从signal中分离出来,部分是为了鼓励编译器在没有等待者的情况下inline。
     * @param first (non-null) the first node on condition queue
     */
    private void doSignal(Node first) {
        do {
            //将first的下个节点作为新的first
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);//如果转换失败,继续循环,继续转换下一个节点
    }

    /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * 将节点从等待队列传输到同步队列。成功则返回true。
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         * 如果不能改变waitStatus,则该node已经被取消了。
         * 因为await的时候,如果发生interrupt,则会将此状态设置为0,表示已经取消等待。
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         * 拼接到队列上,并尝试设置前置线程的waitStatus,以指示线程(可能)正在等待。
         * 如果已取消或尝试设置waitStatus失败,则唤醒以重新同步(在这种情况下,waitStatus可能会暂时错误,且不会造成伤害)
         * 通过AQS的enq方法将节点传输到AQS的同步队列上
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            //被取消了或者状态修改失败,直接unpark
            LockSupport.unpark(node.thread);
        return true;
    }

   /**
    * Moves all threads from the wait queue for this condition to
    * the wait queue for the owning lock.
    * 将此条件的所有线程从等待队列移动到拥有锁的等待队列。
    */
   public final void signalAll() {
      if (!isHeldExclusively())
         throw new IllegalMonitorStateException();
      Node first = firstWaiter;
      if (first != null)
         doSignalAll(first);
   }

   /**
    * Removes and transfers all nodes.
    * @param first (non-null) the first node on condition queue
    */
   private void doSignalAll(Node first) {
      lastWaiter = firstWaiter = null;
      do {
          //循环所有的等待节点,依次transferForSignal
         Node next = first.nextWaiter;
         first.nextWaiter = null;
         transferForSignal(first);
         first = next;
      } while (first != null);
   }
}

总结:

  1. signal 单个唤醒,则将第一个节点firstWaiter状态从CONDITION改为0,并加等待队列转移到同步队列中。
  2. signalAll 则是从第一个循环到最后一个,都处理一遍。
  3. 这里的signal并不会直接将等待节点的线程unpark,而是加到了同步队列,等通知者线程unlock了,release逻辑会将等待线程unpark。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容