ReentrantLock源码分析(三)

synchronized有个重要的功能,可以通过object中的wait()和 notify()方法实现生产者/消费者。ReentrantLock基于Condition也同样可以实现,而且相对于synchronized的无差别通知,ReentrantLock可以选择性的通知,减少了很多无用的线程竞争。本文主要就是分析下ReentrantLock是怎么通过Condition实现生产者/消费者模式的。

public class ProducerCustomerWithLock {
    Executor pool = Executors.newFixedThreadPool(5);
    private List<String> storeList = new LinkedList<>();//仓库
    //仓库容量
    private int MAX_VALUE = 5;
    //仓库为空
    private int MIN_VALUE = 0;
    // 线程锁
    private Lock lock = new ReentrantLock();
    //仓库满了,绑定生产者线程
    private Condition full = lock.newCondition();

    //仓库为空,绑定消费者线程
    private Condition empty = lock.newCondition();
    //生产者
    private class producer implements Runnable {

        @Override
        public void run() {
            while (true) {
                produce();
            }
        }

        private void produce() {
            System.out.println(Thread.currentThread().getName() + "进入仓库,准备生产!");

            try {
                lock.lock();
                if (storeList.size() == MAX_VALUE) {
                    System.out.println("仓库已满,等待消费");
                    Thread.sleep(1000);
                    full.await(); //当前线程等待,让其他线程继续执行,可以看出wait是释放锁的
                }
                if (storeList.size() < MAX_VALUE) {
                    String product = "产品" + new Random().nextInt();
                    storeList.add(product);
                    System.out.println(Thread.currentThread().getName() + "往仓库中生产了一个产品!" + product);
                }
                Thread.sleep(1000);
                empty.signalAll();//唤醒消费者线程
            } catch (InterruptedException e) {
                System.out.println("中断异常");
//                    e.printStackTrace();
            }
            finally {
                lock.unlock();
            }
        }
    }

    private class consumer implements Runnable {

        @Override
        public void run() {
            while (true) {
                consume();
            }
        }

        private void consume() {
            System.out.println(Thread.currentThread().getName() + "进入仓库,准备消费!");

                try {
                    lock.lock();
                    if (storeList.size() == MIN_VALUE) {
                        System.out.println("仓库已空,等待生产");
                        Thread.sleep(1000);
                        empty.await(); //当前线程等待,让其他线程继续执行,可以看出wait是释放锁的
                    }
                    if (storeList.size() > MIN_VALUE) {
                        System.out.println(Thread.currentThread().getName() + "从仓库取得产品:" + storeList.remove(0));
                    }
                    Thread.sleep(1000);
                    full.signalAll();//唤醒生产者线程
                } catch (InterruptedException e) {
                    System.out.println("中断异常");
//                    e.printStackTrace();
                }
                finally {
                    lock.unlock();
                }
            }

    }

    //启动生产者和消费者线程
    public void start() {
        for (int i = 1; i < 5; i++) {
            pool.execute(new producer());
            pool.execute(new consumer());
        }

    }

    public static void main(String[] args) {
        ProducerCustomerWithLock pc = new ProducerCustomerWithLock();
        pc.start();
    }
}

打印结果如下,生产者消费者交替运行:

pool-1-thread-1进入仓库,准备生产!
pool-1-thread-2进入仓库,准备消费!
pool-1-thread-3进入仓库,准备生产!
pool-1-thread-1往仓库中生产了一个产品!产品-885144207
pool-1-thread-4进入仓库,准备消费!
pool-1-thread-5进入仓库,准备生产!
pool-1-thread-1进入仓库,准备生产!
pool-1-thread-2从仓库取得产品:产品-885144207
pool-1-thread-2进入仓库,准备消费!
pool-1-thread-3往仓库中生产了一个产品!产品-1433422526
pool-1-thread-3进入仓库,准备生产!
pool-1-thread-4从仓库取得产品:产品-1433422526
pool-1-thread-4进入仓库,准备消费!
pool-1-thread-5往仓库中生产了一个产品!产品-137356193
pool-1-thread-5进入仓库,准备生产!
pool-1-thread-1往仓库中生产了一个产品!产品831540660
...

接下来就直接进入正题吧,看下ConditionObject.await()方法。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

先判断线程状态是否被中断,如果被中断则抛出异常。然后开始构建Condition队列,看下addConditionWaiter()方法。

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

先判断condition尾节点的waitStatus是否是condition状态。如果不是只能是cancel状态,则把队列中是cancel状态的节点移除。找到第一个是condition状态的尾节点。

新建一个Node节点,模式是CONDITION,如果之前不存在尾节点。则把当前节点作为头结点和尾节点,否则把当前节点置为尾节点。然后执行fullyRelease(node)方法。

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

首先进入到await的方法的前提条件是获取到锁,所以这一步是释放锁并且唤醒AQS队列头结点的后置节点。这里为什么要释放锁呢,很简单await方法会阻塞当前线程。当前线程如果不释放锁就会导致后面的线程获取不到锁从而阻塞。极大的影响性能还可能造成死锁。所以await方法是会释放锁的。

再看isOnSyncQueue(node)是判断当前节点是否在AQS队列中,如果不在则阻塞当前节点所在线程。直到signal方法唤醒该线程。那么先看下signal():

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
 private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

首先把当前节点waitStatus状态CAS操作为0。如果设置失败,且该节点为condition头结点,则把该节点排除出队列。
如果设置成功,则把该节点插入到AQS队列中。也就是说Signal()唤醒后不是立即执行的。而是进入到AQS队列中排队。
如果该节点被取消了或者已经被设置成了SIGNAL,则取消阻塞该节点所在线程。其他情况由AQS头节点唤醒。

再回过头看await方法。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

之前睡眠的线程被唤醒了,改节点已经加入到了AQS队列可以退出循环了,然后主要就是去获取锁并返回线程的中断状态。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容