AQS源码分析

一、前言

问题引入

@Controller
public class TradeController {
    @Autowired
    private TradeService tradeService;

    @RequestMapping("/order")
    public String order() {
        tradeService.decStock();
        return "success";
    }
}

@Service
public class TradeService {
    Logger logger = Logger.getLogger(TradeService.class);
    @Autowired
    JdbcTemplate jdbcTemplate;
    /**
     * 扣减库存
     * @return
     */
    public String decStock() {
        Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
        if (stock <= 0) {
            logger.info("库存不足,下单失败!");
            return "库存不足,下单失败!";
        }
        stock--;
        jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
        logger.info("下单成功,当前剩余库存:" + stock);
        return "下单成功,当前剩余库存:" + stock;
    }
}

如上代码,用Jmeter模拟30个请求同时下单,结果30个请求都下单成功,产生了超卖问题。

自定义AQS解决

下面实现自定义一个同步器来实现自定义锁

/**
 * 2021/7/1
 * 自定义AQS实现
 */
public class MyLock {

    private volatile int state = 0;

    private Thread lockHolder;

    // 要用线程安全的队列作为等待队列,基于CAS实现
    // linkedBlockedQueue基于AQS实现,不能用
    private ConcurrentLinkedDeque<Thread> waiters = new ConcurrentLinkedDeque<>();

    public int getState() {
        return state;
    }
    public void setState(int state) {
        this.state = state;
    }
    public Thread getLockHolder() {
        return lockHolder;
    }
    public void setLockHolder(Thread lockHolder) {
        this.lockHolder = lockHolder;
    }

    public void lock() {
        Thread currentThread = Thread.currentThread();
        if (acquire()) {
            return;
        }
        waiters.add(currentThread);
        // 自旋
        for (; ; ) {
            // 队列里第一个线程才能抢锁
            if (currentThread == waiters.peek() && acquire()) {
                // 队列头线程拿到锁 踢出等待队列
                waiters.poll();
                return;
            }
            // 阻塞当前线程 放弃CPU使用权
            LockSupport.park();
        }
    }

    public void unLock() {
        if (Thread.currentThread() != lockHolder) {
            throw new RuntimeException("lockHolder is not current thread");
        }
        if (compareAndSwapState(getState(), 0)) {
            setLockHolder(null);
            // 唤醒队列里第一个线程
            Thread first = waiters.peek();
            if (first != null) {
                LockSupport.unpark(first);
            }
        }
    }

    // 是否能加锁成功
    private boolean acquire() {
        Thread currentThread = Thread.currentThread();
        if (getState() == 0) { // 同步器尚未被持有
            // 没人排队/自己是队列头,才能去尝试原子操作改变state
            if ((waiters.size() == 0 || currentThread == waiters.peek()) && compareAndSwapState(0, 1)) {
                setLockHolder(currentThread);
                return true;
            }
        }
        return false;
    }


    // 利用Unsafe类实现原子操作改变值
    public final boolean compareAndSwapState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    private static final Unsafe unsafe = reflectGetUnsafe();
    // 偏移量
    private static final long stateOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
        } catch (Exception e) {
            throw new Error();
        }
    }

    // 反射获取Unsafe类
    private static Unsafe reflectGetUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            return null;
        }
    }
}
public class TradeService {
    Logger logger = Logger.getLogger(TradeService.class);
    @Autowired
    JdbcTemplate jdbcTemplate;
    // 单例 创建锁对象
    MyLock myLock = new MyLock();
    public String decStock() {
        myLock.lock();  // 加锁
        Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
        if (stock <= 0) {
            logger.info("库存不足,下单失败!");
            myLock.unLock();    // 业务失败 释放锁
            return "库存不足,下单失败!";
        }
        stock--;
        jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
        logger.info("下单成功,当前剩余库存:" + stock);
        myLock.unLock();    // 业务成功 释放锁
        return "下单成功,当前剩余库存:" + stock;
    }
}

二、概述

抽象同步框架,可以用来实现一个依赖状态的同步器

1.可以实现独占与共享两种模式:

  • 独占:互斥,资源只能同时被一个线程占有,如ReentrantLock、Mutex

  • 共享:资源不互斥,可以被多个线程占有,如Semaphre、CountDownLatch

    一般来说同时只会实现一种模式,但也有ReentrantReadWriteLock同时实现独占和共享两种方式

2.核心属性:

  • state:同步器的状态,即共享资源。访问方式为:getState()、setState()、compareAndSetState()

  • exclusiveOwnerThread:当前持有共享资源的线程

  • head:同步队列头

  • tail:同步队列尾

3.特性:

  • 可中断

  • 可重入

4.自定义AQS

一般自定义同步器的时候,只需要自定义共享资源的获取和释放方式。至于等待队列的维护,AQS已经定义好了,不需要重写。所以主要重写以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

[图片上传失败...(image-ed6b0e-1649076723271)]

三、源码

waitStatus表示Node节点的等待状态

waitStatus 判断结果 说明
0 初始化状态 该节点尚未被初始化完成
1 取消状态(CANCELLED) 说明该线程中断或者等待超时,需要移除该线程,进入该状态后节点不会变化
-1 有效状态(SIGNAL) 下一个节点等着自己唤醒。节点入队会把前继节点状态更新为SIGNAL
-2 有效状态(CONDITION) 结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
-3 有效状态(PROPAGATE) 共享模式下,自己不仅会唤醒后继节点,同时也可能唤醒后继节点的后继节点

AQS中阻塞队列采用双向链表结构,使用prev、next连接;而条件队列采用单向链表,采用nextWaiter连接

nextWaiter状态标志 说明
SHARED(共享模式) = new Node() 立即唤醒下一个节点
EXCLUSIVE(独占模式) = null 等待当前线程执行完再唤醒
其他非空值 根据条件决定怎么唤醒下一个节点

1.acquire()

acquire()是获取共享资源的顶级入口,获取到资源则直接返回,否则入等待队列,直到获取到共享资源,这个过程忽略中断的影响(如果需要可中断,可以调用acquireInterruptibly())。获取到资源之后就可以执行临界区的代码了

   public final void acquire(int arg) {
        // 尝试获取共享资源
        if (!tryAcquire(arg) &&
            // 
            acquireQueued(
                // 加入等待队列尾部
                addWaiter(Node.EXCLUSIVE), arg)
           )
            // 如果在acquireQueued()被中断过 这里自己补一个中断 
            selfInterrupt();
    }

1.1tryAcquire()

AQS中,这个方法不能被直接调用,需要子类重写。这里不定义为抽象方法的好处在于,独占模式下只需重写tryAcquire(),共享模式写重写tryAcquireShared(),如果定义为抽象方法则都需要实现。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

以ReentrantLock为例,非公平锁中中,会直接CAS尝试获取共享资源,公平锁中,会先检查队列中有没有正在等待的线程,才去获取共享资源

1.2addWaiter()

如果tryAcquire()加锁失败,会addWaiter(Node.EXCLUSIVE), arg),创建一个独占模式的节点加入到队列尾部

   private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);

        // 尝试快速入队
        // 这里直接进行了一次CAS加到尾部的尝试,失败才去自旋 为什么要这样呢?直接调用enq(node)效果似乎也一样?
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
       // 自旋 空队列则CAS地初始化队列,队列有节点就把节点挂到队列尾部
        enq(node);
        return node;
    }

1.3acquireQueued()

把当前节点加入队列尾部之后,acquireQueued()

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取自己的前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点是头结点(正持有资源的节点) 自己是第二个节点,那么可能有机会获得资源 
                // 尝试获取资源(非公平锁直接获取,公平锁会先检查队列中有没有线程在等待再获取)
                // 第一次进来就直接看能不能获取资源 也可能是等待后,被唤醒(前驱节点释放资源/中断)循环走到这里
                if (p == head && tryAcquire(arg)) {
                    // 获取到资源 把当前节点设为头结点,前驱节点置空,保存的线程置空(头结点没必要保存线程了)
                    setHead(node);
                    // 去掉之前的头节点
                    p.next = null; // help GC
                    failed = false;
                    
                    // 返回等待过程中有没有被中断过
                    return interrupted;
                }
                // 如果自己不是第二个节点 或者自己虽然是第二个节点,但是由于非公平锁,新来的线程可以不入队列,直接获取锁,所以这里可能被其他线程抢先了
                // 检查是否可以休息,找到可以安心休息的地方。将前驱节点waitStatus改为-1,即后续来唤醒自己
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // park 阻塞等待
                    // 检查中断标志 如果是被中断了而不是被unPark()
                    // 后面tryAcquire方法()获取到资源之后会返回中断标志,acquire()里自行产生一个中断标志
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

尝试获取资源失败后,检查自己是否可以安心休息,如果不能就找到一个可以安心休息的地方

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
         // 前驱节点的状态已经是SIGNAL,到时候会来唤醒自己,所以可以安心休息
        if (ws == Node.SIGNAL)
            return true;
        // 前驱节点状态是被取消 
        if (ws > 0) {
            do {
                // 一直往前找没有取消的节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 把自己挂到它后面
            pred.next = node;
        } else {
            // 走到这里意味着状态是0/PROPAGATE 则把前驱节点状态置为SIGNAL 
            // 这里就算成功了,也不会直接返回true开始休息,而是再一轮尝试获取资源,获取不到再park
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    private final boolean parkAndCheckInterrupt() {
        // 等待
        LockSupport.park(this);
        // 被唤醒 返回自己是不是因为中断被唤醒  中断信号能打断一个等待中的线程,终止等待
        return Thread.interrupted();
    }

总结:如果当前是第二个节点,就去尝试获取资源,如果成功了直接返回。否则的话,寻找一个合适的休息点,开始等待,直到被前驱节点unPark()或被中断,才继续判断自己是否是第二个节点,去尝试获取资源。这个方法会返回等待过程中是否被中断过,后续用来自行产生一个中断。这也是不响应中断的核心,等待过程不可中断,只有获取到资源之后,才可以被中断

1.4selfInterrupt()

自行产生一个中断

1.5总结

1.先尝试获取资源

2.获取不到则把当前线程加入队列尾,标记独占模式

3.检查自己是否为第二个节点,是则尝试获取锁,不是则进入等待,等轮到自己了,前驱节点会unPark自己,自己再去尝试获取资源。如果被中断过,则返回出去

4.获取资源成功,如果被中断过,则产生一个中断

2.release()

释放资源的顶级入口,释放完资源,会唤醒队列中后一个正在等待的线程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 判断头节点不为空,状态不为0
            if (h != null && h.waitStatus != 0)
                // 唤醒后继节点 找到队列里离head最近的一个没取消的node,unpark恢复其运行
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

2.1tryRelease()

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

2.2unparkSuccessor()

   private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        // 当前结点状态为负 则置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 一般要唤醒的就是后一个节点,但是可能该节点已取消 所以要从尾结点一直往前找,找到真正有效的节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

疑问:这里为什么要从后往前找?

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            // 要挂上去的节点 prev是一定能成功指向之前的尾结点的
            node.prev = pred;
            // CAS把尾结点设置为当前节点
            if (compareAndSetTail(pred, node)) {
                // 但这里可能还没有执行到 此时之前的尾结点指向null
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

3.acquireShared()

共享模式下,尝试获取指定量的资源,获取失败则进入等待队列,直到获取到资源。该过程忽略中断

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

3.1tryAcquireShared()

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

负值:失败,需要执行doAcquireShared()进入队列

0:成功,但没有剩余资源

正值:成功,还有剩余资源,其他线程还可以获取

3.2doAcquireShared()

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 自己成为head
        setHead(node);
 
        // 如果还有剩余量 继续唤醒下一个线程
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

3.3总结

先尝试获取资源,没有获取到则进入队列等待,直到获取到资源。与独占模式相比,自己拿到资源之后,还会继续唤醒下一个线程

4.releaseShared()

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//尝试释放资源
        doReleaseShared();//唤醒后继结点
        return true;
    }
    return false;
}

4.1tryReleaseShared()

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

4.2doReleaseShared()

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);//唤醒后继
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head发生变化
            break;
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容