J.U.C锁之 Semaphore

Semaphore 简介

Semaphore 名为"信号量"。

Semaphore用来管理内部许可证,当多个线程要访问竞争资源时可以通过Semaphore来控制并发访问竞争资源的线程数。当线程需要访问竞争资源时需要首先获取一个许可证,执行完毕在返还,如果许可证用完则,线程进入同步队列并阻塞。等待许可证返回唤醒。

主要特性

公平性:支持公平性和非公平性。所谓公平表示在获取锁时逻辑是否要考虑当前正在排队等待线程。按照大白话来说就时公平表示不能插入强占资源。

应用场景

常用函数

获取许可

/** 获取一个许可,获取失败将线程添加到同步队列,并阻塞,等待归还唤醒  **/
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /** 和acquireUninterruptibly功能,同时能响应中断  **/
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /** 和acquireUninterruptibly功能,同时增加阻塞超时  **/
    public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /** 尝试获取许可,如果信号量中不存在许可之际返回false,成功返回true **/
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /** 获取指定数量的许可,获取失败将线程添加到同步队列,并阻塞,等待归还唤醒  **/
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /** 和acquireUninterruptibly(int )功能,同时能响应中断  **/
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /** 和acquire(int)功能,同时增加阻塞超时  **/
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }


    /** 尝试指定数量的许可,如果信号量中不存在许可之际返回false,成功返回true **/
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

释放许可

/** 释放一个许可, **/
public void release() {
    sync.releaseShared(1);
}


/** 释放指定数量许可, **/
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

其他方法

 /**
     * 获取许可的数量
     */
    public int availablePermits() {
        return sync.getPermits();
    }


    /**
     * 清空许可
     */
    public int drainPermits() {
        return sync.drainPermits();
    }


    /**
     * 根据指定的缩减量减小可用许可的数目
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * 判断当前对象是否是公平信号量
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }


    /**
     * 判断是否有线程在同步队列等待许可
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }


    /**
     * 获取等待许可的线程数量
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }


    /**
     * 获取等待许可的线程集合
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

2 实现原理

Semaphore 使用AQS实现锁机制,AQS是AbstractQueuedSynchronizer的缩写,翻译过来就是"同步器",,它实现了Java函数中锁同步(synchronized),锁等待(wait,notify)功能。

AbstractQueuedSynchronizer是一个抽象类,我们可以编写自己类继承AQS重写获取独占式或共享式同步状态模板方法,实现锁锁同步(synchronized),锁等待(wait,notify)功能

2.1 AQS 实现原理

AQS核心是一个同步状态,两个队列。它们实现了Java函数中锁同步(synchronized),锁等待(wait,notify),并在其基础上实现了独占式同步,共享式同步2中方式锁的实现。

无论独占式还时共享式获取同步状态成功则直接返回,失败则进入CLH同步队列并阻塞当前线程。当获取同步状态线程释放同步状态,AQS会选择从CLH队列head头部节点的第一个节点释放阻塞,尝试重写竞争获取同步状态,如果成功则将当前节点出队。如果失败则继续阻塞。

获取同步状态的线程也可以使用condition对象释放同步状态进入等待队列。只有等待其他线程使用condition.signal或condition.signAll()唤醒被从阻塞状态中释放重新竞争获取同步状态成功后从原来指令位置继续运行。

2.1.1 同步状态

AQS实现了锁,必然存在一个竞争资源。AQS存在从一个int类型的成员变量state,我们把它称为同步状态,同步状态通常用做判断线程能否获取锁的依据

2.1.2 同步队列

AQS 实现了锁那么总需要一个队列将无法获取锁的线程保存起来,方便在锁释放时通知队列中线程去重新竞争锁。

实现原理
同步队列又被称为CLH同步队列,CLH队列是通过链式方式实现FIFO双向队列。当线程获取同步状态失败时,AQS则会将当前线程构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态被释放时,会把首节点后第一个节点的线程从阻塞状态下唤醒,唤醒的线程会尝试竞争同步状态,如果能获取同步状态成功,则从同步队列中出队。

image
2.1.3 Condition & 等待队列
  • Java 传统的监视器有如下函数 wait、notify、notifyAll。它们可以实现当一个线程获取锁时,它可以主动放弃锁进入一个条件队列中。只有其他线程通知时才从条件队列中出队,重新获取锁成功后继续执行之前的未完成代码逻辑。

  • AQS内部存在一个内部类实现了Condition接口,其内部维护着一条链式实现单向等待队列。我们可以使用AQS获取内部实现Condition接口对象,调用await(),signal(),signalAll()函数实现Java中wait、notify、notifyAll同样功能。

实现原理

  • 当获取同步状态的线程调用condition.await(),则会阻塞,并进入一个等待队列,释放同步状态.
  • 当其他线程调用了condition.signal()方法,会从等待队列firstWaiter开始选择第一个等待状态不是取消的节点.添加到同步队列尾部.
  • 当其他线程调用了condition.signalAll()方法,会从等待队列firstWaiter开始选择所有等待状态不是取消的节点.添加到同步队列尾部.

这里取消节点表示当前节点的线程不在参与排队获取锁。

image
2.1.4 独占式同步

从概念上来说独占式对应只存在一个资源,且只能被一个线程或者说竞争者占用.

2.1.5 共享式同步

从概念上来说共享式对应存在多个资源的是有多个线程或者竞争者能够获取占用.

2.2 模板方法

我们可以编写自己类继承AQS选择重写独占式或共享式模板方法,从而定义如何获取同步状态和释放同步状态的逻辑。

2.2.1 独占式

tryAcquire:尝试独占式获取同步状态,返回值为true则表示获取成功,否则获取失败。

tryRelease
尝试独占式释放同步状态,返回值为true则表示获取成功,否则获取失败。

2.2.2 共享式

tryAcquireShared:尝试共享式获取同步状态,当返回值为大于等于0的时获得同步状态成功,否则获取失败。

tryReleaseShared:尝试共享式释放同步状态,返回值为true则表示获取成功,否则获取失败。

2.3 如何基于AQS实现Semaphore

由于多个线程可以同时许可同时执行,当然我们选择使用共享同步,Sync需要重写 tryAcquire 获取同步状态条件逻辑,tryRelease释放同步条件逻辑。其核心点在于使用同步状态做判断。当同状态为0时,许可被使用完了,同步状态大于0,许可被还可用,每次调用tryAcquire同步状态-1,每次调用tryRelease同步状态+1

2.4 类结构

内部存在有三个内部类 Sync、NonfairSync 和 FairSync 类。

  • Sync 继承 AbstractQueuedSynchronizer 抽象类。
  • NonfairSync(非公平锁) 继承 Sync 抽象类。
  • FairSync(公平锁) 继承 Sync 抽象类。

Semaphore 很多方法都通过代理内部类的方法实现。

2.5 核心方法

公平信号量获取许可

/**
 * 公平信号量获取同步状态逻辑
 */
protected int tryAcquireShared(int acquires) {
    for (;;) {
        /** 只有同步队列中不存在线程。且同步状态可以获取才能获取锁 **/
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

非公平信号量获取许可

/**
 * 非公平信号量获取同步状态逻辑
 * 返回true 表示获取同步状态成功
 * 返回false 表示获取同步状态失败
 */
final int nonfairTryAcquireShared(int acquires) {
    /** 循环+CAS **/
    for (;;) {
        /** 获取父类同步状态 **/
        int available = getState();
        /** 计算同步状态 -acquires **/
        int remaining = available - acquires;
        /** 使用CAS设置同步状态, **/
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

释放许可

 /**
 * 释放同步状态
 */
protected final boolean tryReleaseShared(int releases) {
    /** 循环+CAS **/
    for (;;) {
        /** 获取父类同步状态 **/
        int current = getState();
        /** 计算同步状态 +releases **/
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        /** 使用CAS设置同步状态, **/
        if (compareAndSetState(current, next))
            return true;
    }
}

完整源码

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    private final Sync sync;

    /**
     * Sync 继承 AbstractQueuedSynchronizer 抽象类
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        /**
         * 初始化同步状态
         */
        Sync(int permits) {
            setState(permits);
        }

        /**
         * 返回同步状态
         */
        final int getPermits() {
            return getState();
        }

        /**
         * 非公平信号量获取同步状态逻辑
         * 返回true 表示获取同步状态成功
         * 返回false 表示获取同步状态失败
         */
        final int nonfairTryAcquireShared(int acquires) {
            /** 循环+CAS **/
            for (;;) {
                /** 获取父类同步状态 **/
                int available = getState();
                /** 计算同步状态 -acquires **/
                int remaining = available - acquires;
                /** 使用CAS设置同步状态, **/
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        /**
         * 释放同步状态
         */
        protected final boolean tryReleaseShared(int releases) {
            /** 循环+CAS **/
            for (;;) {
                /** 获取父类同步状态 **/
                int current = getState();
                /** 计算同步状态 +releases **/
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                /** 使用CAS设置同步状态, **/
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        /**
         * 清理指定数量许可,修改同步状态
         */
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        /**
         * 清空许可,修改同步状态
         */
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }


    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        /**
         * 获取同步状态
         */
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }


    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        /**
         * 公平信号量获取同步状态逻辑
         */
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                /** 只有同步队列中不存在线程。且同步状态可以获取才能获取锁 **/
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    /**  创建具有给定的许可数和非公平的公平设置的 Semaphore。 **/
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**  创建具有给定的许可数和给定的公平设置的 Semaphore。 **/
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }



    /** 获取一个许可,获取失败将线程添加到同步队列,并阻塞,等待归还唤醒  **/
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /** 和acquireUninterruptibly功能,同时能响应中断  **/
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /** 和acquireUninterruptibly功能,同时增加阻塞超时  **/
    public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /** 尝试获取许可,如果信号量中不存在许可之际返回false,成功返回true **/
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /** 获取指定数量的许可,获取失败将线程添加到同步队列,并阻塞,等待归还唤醒  **/
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /** 和acquireUninterruptibly(int )功能,同时能响应中断  **/
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /** 和acquire(int)功能,同时增加阻塞超时  **/
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }


    /** 尝试指定数量的许可,如果信号量中不存在许可之际返回false,成功返回true **/
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }


    /** 释放一个许可, **/
    public void release() {
        sync.releaseShared(1);
    }


    /** 释放指定数量许可, **/
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }


    /**
     * 获取许可的数量
     */
    public int availablePermits() {
        return sync.getPermits();
    }


    /**
     * 清空许可
     */
    public int drainPermits() {
        return sync.drainPermits();
    }


    /**
     * 根据指定的缩减量减小可用许可的数目
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * 判断当前对象是否是公平信号量
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }


    /**
     * 判断是否有线程在同步队列等待许可
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }


    /**
     * 获取等待许可的线程数量
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }


    /**
     * 获取等待许可的线程集合
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }


    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容