AQS同步并发框架查缺补漏-互斥锁、共享锁、公平锁、非公平锁、信号量

本文适合AQS有一定基础的伙伴进行阅读,对其中比较重点的内容做一个简单的总结,本文不会对AQS基础框架和源码进行很详细的分析,网上有很多这种资源,大家可以先深入了解一下,直接上干货。

一、互斥锁与共享锁
AQS是锁实现的基础框架,AQS区别了互斥锁与共享锁的实现方式,互斥锁就是只有一个线程同一时刻可以获得锁,共享锁是同一时刻可以有多个线程可以获得锁(这里不要与读与写进行关联,我之前有一段时间也有这种误区),但是这两种锁是否能够获得到锁,是实现锁功能的类实现的如(ReentrantLock、ReentrantReadWriteLock),一般都需要实现如下几个方法。

1.互斥锁

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

首先调用实现锁的tryAcquire来判断是否能获得锁,若不能获得锁则初始化一个EXCLUSIVE类型的Node节点,并以CAS配合自旋的方式放入同步队列尾部,然后再看该节点是否在同步队列的首节点后,如果是再次调用tryAcquire,如果不能获得到锁
该线程就会阻塞( LockSupport.park实现)。

二、共享锁

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

调用实现锁的tryAcquireShared方法,如果<0代表没有获得锁,直接调用doAcquireShared方法,该方法的大部分与互斥锁很相似,但是其中的不同是setHeadAndPropagate方法,该方法是在获得锁后实现了一个"传播行为",依次唤醒同步队列中的SHARE节点。

二、公平锁与非公平锁
公平锁与非公平锁是针对ASQ框架所实现的锁(不是AQS自身实现,这点很重要),我个人的理解是公平锁是按照申请锁的顺序获取锁,而非公平锁就没有这样严格的要求,那么我们知道要实现锁,都需要实现AQS的核心方法(源码可以查看ReentrantLock中的NonfairSync和FairSync),那我们看一下公平锁与非公平锁的具体实现。

1.非公平锁

  final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

根据state==0的条件(状态)来尝试上锁,通过CAS的方式成功更改了state则获得到锁,且支持重入,如果没有获得到锁返回false,后面的操作和ASQ的互斥锁的流程一致,这里不做过多的介绍了。

2.公平锁

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

整段方法大家可以看到处理过程中多了一个hasQueuedPredecessors方法,其他大部分都很相似,这也是公平锁与非公平锁的控制的关键,主要实现的就是判断当前是否已经存在等待的线程,如果存在则不能获得锁,老老实实的在AQS的同步队列中进行排队。

三、读写锁
在ReentrantReadWriteLock中有两个锁,一个是ReadLock、一个是WriteLock,一般适用于多读少写的一些场景,之前我提过所有的锁都是基于AQS进行的实现,所以读写锁也一样,只是针对于这种场景(可以同一时间并发读,但是只要有写就阻塞)进行了处理。
1.读锁

protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

在读写锁中state32位,高16位为读锁获取的次数,低16位是互斥锁获取的次数,如果获取读锁时已经有其他线程获得了写锁那就直接返回获得锁失败。如果没有人获得写锁,那就直接修改state的值,代表本线程获得到了读锁。如果r==0说明没有人获得读锁,则给firstReader设置为当前线程,重入次数为1,如果firstReader已经是自身线程,则重入次数加1(支持重入),如果都不是,则从缓存中获得重入次数并自增,如果本方法没有获得锁那还是根据AQS共享锁的原理在同步队列等待。
读锁的释放这里不做过多的描述,大概的思路是如果当前线程是firstReader且重入次数为1则设置firstReader为空,如果不是则获取缓存中的重入数量减1,如果减完后为0则返回true,进行同步队列中线程的唤醒,其他时候返回false。
2.写锁

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

如果state!=0且w==0,说明有读锁的获取次不为0,getExclusiveOwnerThread说明其他线程获得了写锁,以上两种情况都返回获取锁失败,如果该线程通过重入获取锁成功则重新设置state的值,返回true。如果在上一步state为0则直接尝试获得写锁,获得到返回true,获取不到锁返回false,后续流程就与AQS是互斥锁一致了。
释放锁的过程也类似,修改state的值(减releases),如果重入为0则设置OwnerThread为null,返回true,唤醒同步队列中的线程。

四、CountDownLatch
在CountDownLatch进行初始化的时候会设置state的个数,每当执行countDown方式的时候就会调用tryReleaseShared方法将state减1,如果最后state不为0则一直都返回false,一旦state为0,则会唤醒等待队列中的线程。

 protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

上面简述了唤醒等待线程的过程,那这些进程是如何进入到队列中的,其实是调用了CountDownLatch的await方法,await方法内部调用了acquireSharedInterruptibly方法,如果state不为0也就是还有线程没有运行,则所有的await线程要进入到等待队列中。

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

  public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

  protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

五、Semaphore
该锁也成为信号量,同一时刻最多只有N个线程可以同时工作,可以实现简单的限流,底层的实现也是基于AQS,在该锁中也可以实现公平锁与非公平锁两种方式。

1.获得许可(以非公平锁为例)

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

 protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

当线程想获得一个锁的时候,该锁实现了可中断和非终端两种方式,nonfairTryAcquireShared方法是一个自旋,根据state获得现在还剩下的许可数,然后减想获得的许可数,如果剩余的许可小于0则返回一个负数,该线程进入到AQS是同步队列中。如果剩余的许可数大于0,则会更新state的许可数,然后返回一个正数,该线程获得锁。

2.释放许可

 public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

  protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

释放一个许可也很简单,获得当前的许可数state,将释放的许可数叠在state上(可以提供这种方式动态的增加许可的数量),然后对state进行更新,如果更新成功则返回true,依次唤醒同步队列中的线程获取锁。

小结:AQS的核心就是互斥锁和共享锁的实现流程,至于其他出现的锁,都是在AQS的基础上实现的,比如ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore等,我看源码的时候最后就是充分的理解了这句话才对整个AQS理解更加的深刻。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355