深入理解AQS(一)- 独占模式

AQS简介

AbstractQueuedSynchronizer简称AQS,它提供了一个FIFO双向队列,用来实现锁和其他的同步框架,AQS通过子类继承并实现它的方法来管理同步状态,如ReentrantLock、CountDownLatch底层都继承了AQS;

AQS独占锁的实现

AQS数据结构以及属性

AQS内部维护了一个FIFO双向队列,队列中的元素是一个Node类型的节点,Node节点元素如下:

static final class Node {

    static final Node SHARED = new Node();

    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;

    static final int SIGNAL    = -1;

    static final int CONDITION = -2;

    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;
    
    volatile Thread thread;
    
    Node nextWaiter;
    
}
  • SHARED:共享锁标记;
  • EXCLUSIVE:独占锁标记;
  • waitStatus:表示节点的状态、节点状态有:
    • CANCELED:值为1,表示节点被取消;
    • SIGNAL:值为-1,表示表示当前节点的后继节点已经或者将要被阻塞,当前节点释放后,后继节点需要被唤醒;
    • CONDITION:表示节点在等待condition,在condition队列中;
    • PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点(仅在共享模式下使用);
    • 0:waitStatus的初始状态;
  • prev:当前节点的前驱节点;
  • next:当前节点的后继节点;
  • nextWaiter:共享模式下存储在condition队列的后继节点;
  • thread:当前线程;

AQS中维护了一个head节点、一个tail节点,表示队列的头节点和尾节点;AQS同时还维护了一个state变量,该变量主要表示加锁的状态;在ReentranLock中,由于ReentrantLock是一个可重入锁、所以每加一次锁后state的状态加1,释放一次锁后state状态减1,当state为0时表示锁已被释放掉;AQS还维护了headOffset、tailOffset等,用来记录头节点、尾节点、节点状态的内存偏移量,在cas操作时通过对应的offset偏移量用来确定head节点、tail节点等在内存中的位置,来替换相对应的变量;

AQS独占锁的代码实现

1、acquire

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

ReentrantLock的构造方法默认是使用的非公平锁、在NonfairSync非公平锁中、如果通过cas能获取锁,则调用setExclusiveOwnerThread设置exclusiveOwnerThread为当前线程,表示这个线程独占这个锁;否则就调用AQS的acquire方法;

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire方法主要工作:

  • 1.尝试获取独占锁;
  • 2.获取成功则返回、获取失败则调用addwatier把当前线程作为Node节点添加到队列尾部;
  • 3.调用acquireQueued方法自旋、尝试获取锁直到成功,并返回中断标志位;
  • 4.如果获取锁失败,并且中断标志位为true,然后设置线程中断;

2、tryAcquire

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

tryAcquire方法在AQS内部是没有实现的,需要由子类继承后去实现它的方法。这边为什么不用abstract来修饰?因为在AQS中有2种锁方式:共享锁和独占锁,如果用抽象方法、那子类就必须实现对应的功能,如果子类是共享锁,那它还得出现独占锁的方法,但这对子类来说并不是需要的,所以这边并没有用abstract来修饰。
以下是ReentrantLock中非公平锁对tryAcquire的实现:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

3、addWaiter

    private Node addWaiter(Node mode) {
        //创建当前线程的节点
        Node node = new Node(Thread.currentThread(), mode);
        //获取AQS的尾节点
        Node pred = tail;
        //尾节点为空则进行enq
        if (pred != null) {
            //尾节点不为空,则把当前节点的前继节点指向队列的尾节点
            node.prev = pred;
            //通过cas重新把尾节点设置为当前节点,否则enq
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

addWaiter方法主要是根据当前线程创建一个Node节点,并添加到队列的尾部。

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

在AQS初始化的阶段会调用static代码块获取tailOffset,即tail节点在内存地址的偏移量。compareAndSetTail通过cas操作、根据tailOffset获取内存中的tail节点、如果tail节点与预期的expect节点相同(except也就是tail节点、在并发下tail节点可能会一直改变),则把内存中的tail节点修改为update节点;compareAndSetHead、compareAndSetWaitStatus、compareAndSetNext等方法与之类似;

4、enq

    private Node enq(final Node node) {
        //不断执行for循环,直至成功
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //如果tail节点为空、则创建一个Node节点作为头节点和尾节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //tail节点不为空、则重新通过cas操作来设置当前节点为队尾、以及设置当前节点的前继节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法主要是不断的循环,是一种自旋操作,去初始化头尾节点、以及设置当前节点的前继节点与尾节点;因为在addWaiter方法中cas操作设置尾节点可能会失败,在并发情况下尾节点可能会不断的更改,所以enq方法自旋去不断的设置当前节点的前继节点和队列的尾节点,以保证节点能添加到队列尾部。

注意这边设置tail节点和设置当前节点并不是原子操作,所以在next节点的指针可能会出现问题,后面为保证遍历到每一个节点,需要从尾部向前递归。

5、acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //中断标志位
            boolean interrupted = false;
            for (;;) {
                //获取前置节点
                final Node p = node.predecessor();
                //如果前置节点是head,则尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    //设置head节点为当前节点
                    setHead(node);
                    //清除之前到head节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果当前节点的前继节点不是head或者获取锁失败
                //则判断是否需要park阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued方法是自旋的获取锁直至成功,最后返回中断标志位。

6、shouldParkAfterFailedAcquire

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
        //如果前继节点状态为CANCELED、则不断向前推找到不为CANCELED的节点
        //把当前节点的前继节点指向不为CANCELED的节点,删除中间的CANCELED节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        //节点状态为0或者为CONDITION时,则把前继节点状态改为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

shouldParkAfterFailedAcquire方法主要是判断节点是否需要park阻塞,并删除节点前面的CANCELED节点,所以只有当前继节点为SIGNAL时,当前节点才需要阻塞。

7、parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

LockSupport.park方法:判断线程是否中断、如果线程不是中断的则阻塞当前线程,需要调用unPark方法进行唤醒;如果线程是中断的,则直接返回。

interrupted()方法的作用,该方法是获取线程的中断状态,并复位,也就是说,如果当前线程是中断状态,则第一次调用该方法获取的是true,第二次则是false。而isInterrupted()方法则只是返回线程的中断状态,不执行复位操作。

  • 当线程为非中断时,执行park方法进行阻塞;
  • 当线程为中断时,Thread.interrupted返回true,并重新复位中断标志位,再次循环后重新调用park方法进行阻塞;

这边为何需要阻塞呢,假设前面一个线程获取锁之后执行了很长时间,导致for循环一直在执行,如果有很多的线程都在执行,这样就导致cpu使用率飙升。所以这边需要用park进行阻塞,需要时调用unPark方法唤醒。

8、cancelAcquire

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        
        //去掉节点上的线程
        node.thread = null;

        Node pred = node.prev;
        //删除队列中当前节点前面的CANCELED节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
            
        Node predNext = pred.next;
        //当前节点的状态置为CANCELED
        node.waitStatus = Node.CANCELLED;
        //如果当前节点为尾节点,则把当前节点的前继节点设为尾节点
        if (node == tail && compareAndSetTail(node, pred)) {
            //把尾节点的next节点置空
            compareAndSetNext(pred, predNext, null);
        } else {
            //当前节点的前继节点不为头节点
            //如果前继节点不为SIGNAL,则设置为SIGNAL
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                //把前继节点的next节点设为当前节点的下个节点
                //相当于删除当前节点
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                //如果是head的后继节点、或者设置状态失效,则唤醒它的后继节点
                unparkSuccessor(node);
            }
            
            node.next = node; // help GC
        }
    }

cancelAcquire 方法删除队列中的当前节点以及当前节点前面的CANCELED节点、如果当前节点是头节点,则唤醒当前节点的后继节点。

9、unparkSuccessor

private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        //尾节点向前遍历,找到最靠近当前节点的waitStatus<=0的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //如果节点不为空,则唤醒节点的线程
            LockSupport.unpark(s.thread);
    }

unparkSuccessor方法主要是唤醒最靠近当前节点,并且waitStatus<=0的节点;这边为何要从尾节点向前遍历到当前节点?因为在addWaiter方法、enq方法中

 if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
    }

设置尾节点和设置前置节点的下一个节点不是原子操作,可能出现next节点没有指向,导致遍历时漏掉新进的尾节点;以及在cancelAcquire中node.next = node,遍历next节点出现环节点;所以next节点并不是安全的,所以相对于用next遍历,prev遍历更加安全。

AQS独占锁的释放

release

    public void unlock() {
        sync.release(1);
    }

在ReentrantLock中unlock是调用AQS的release方法来释放锁。

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    public final boolean release(int arg) {
        //释放锁
        if (tryRelease(arg)) {
            Node h = head;
            //释放锁后唤醒后继节点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

在AQS中tryRelease也和tryAcquire一样,需要子类去实现;
ReentrantLock中truRelease的实现:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

总结

AQS独占锁通过自旋、标志位、park、unPark以及队列来处理锁的状态,包括获取锁,释放锁;AQS通过大量的cas操作去实现更新操作,以保证状态、队列的原子性,当然cas只能保证一个属性的原子、并不能保证代码块的原子性;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容