Condition源码分析

并发源码分析篇:

Condition类似于Object方法中的await和signal,其作用都是将线程挂起和唤醒。但是Condition更加灵活,可以通过newCondition方法获取多个等待队列。

下面这段代码通过Condition实现了一个生产者和消费者模式,同时这段代码也类似于ArrayBlockingQueue阻塞队列的实现,当数组满了,生产者将会阻塞,只有当消费者消费了消息后唤醒生产者生产者才会继续工作,同理消费者也是,当数组中没有数据的话将会被阻塞,直到数组中有数据才会被唤醒继续工作,Condition的使用必须结合Lock锁的使用,需要先获取到锁,否则会抛异常。

package com.ltf.study.concurennt;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo {
    static Lock lock = new ReentrantLock();
    static Condition fullConditon = lock.newCondition();
    static Condition emptyCondition = lock.newCondition();
    static String[] arr = new String[5];
    static int takeindex;
    static int putindex;
    static int count;
    public static void main(String[] args) {
        for (int i = 0; i < 6; i++) {
            provider(i+"");
        }
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 10; i++) {
            consumer();
        }
    }
    public static void provider(String str) {
        new Thread(()->{
            try {
                lock.lock();
                while (count >= 5) {
                    fullConditon.await();
                }
                System.out.println(Thread.currentThread().getName()+"==生产了:"+str);
                arr[putindex] = str;
                count++;
                putindex = ++putindex <5 ? putindex:0;
                emptyCondition.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        },"provider").start();
    }

    public static void consumer() {
        new Thread(()->{
            try {
                lock.lock();
                while (count == 0) {
                    emptyCondition.await();
                }
                System.out.println(Thread.currentThread().getName()+"消费:"+arr[takeindex]);
                arr[takeindex] = null;
                takeindex = ++takeindex <5 ? takeindex:0;
                count--;
                fullConditon.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        },"comsumer").start();
    }
}

运行结果

provider==生产了:0
provider==生产了:2
provider==生产了:1
provider==生产了:3
provider==生产了:4
comsumer消费:0
comsumer消费:2
comsumer消费:1
provider==生产了:5
comsumer消费:3
comsumer消费:4
comsumer消费:5

锁的使用就不用多说了,可以看ReentrantLock源码分析

下面是等待队列的结构,为单向链表



当线程获取锁,调用await方法时,我们来看他做了什么

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 构建等待队列链表
            Node node = addConditionWaiter();
            // 释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                // 阻塞当前线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 竞争锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

在这里首先会构建链表,然后释放锁,唤醒同步队列中的下一个节点去获取锁,然后阻塞当前线程。

        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

这里同样假设有A,B,C三个线程分别获取到锁调用await方法,就会构成如下队列并且阻塞,只有等到其他线程调用signal方法将节点加入到同步队列末尾等到前置节点将该节点唤醒才能继续在上次挂起的地方重新执行。



这时我们来看signal方法,该方法就是获取等待队列中的首节点传入doSignal方法中。

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

doSignal方法做了以下几件事

  1. 如果当前首节点没有下一个节点,将尾节点置空
  2. 将当前节点的下一个节点引用置空,也就是从等待队列中移除了。
  3. 将当前移除的等待节waitState状态设置为0并且加入到同步队列末尾,并且将同步队列中对应的前置节点的waitState状态设置为-1。
    signal做的事情其实就是将等待队列中的节点移到同步队列中

此时的同步队列和等待队列中的状态如下



A节点被移到了同步队列中,当被同步队列中的前置节点唤醒后,A节点继续会在上次挂起的地方运行。

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

设置中断状态为了后面时直接抛异常还是重新中断,并且继续执行判断isOnSyncQueue

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

此时的node waitState状态为0,并且前置节点不等于null,所以结果会返回true,所以跳出了while循环继续执行acquireQueued方法去抢占锁,继续抢占的原因是这是非公平锁,有可能被同步队列之外的其他线程获取到
至于sigalAll方法就是循环将每个节点移到同步队列中

        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

到这里Condition的基本实现结束了。

总结

Condition我们可以这样理解,就是在同步队列之外多了其他的等待队列,wait方法就是将同步队列中的节点移到等待队列中,而signal方法则是将等待队列中的节点移动到同步队列中,从而达到线程之间的通信

await

signal

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容