13. 同步工具类的使用

[TOC]

同步工具类,通常利用他们的特性来构建并发安全的类.

常用的同步工具类有:

  • 信号量
  • 闭锁
  • 栅栏
  • FutureTask

信号量(Semaphore)

Java的信号量实际上就是基于操作系统的信号量来实现的.

信号量的原理可以参考《互斥的底层实现》关于信号量的描述。简而言之,信号量是用来控制同时访问某个特定资源的操作数量,或者同时执行某个操作的数量。如果数量固定为1,也就是互斥了。

计数信号量相当于若干数量的许可证,持有许可证的线程或者操作可以执行某个方法或者获取某个资源(也是执行操作),当许可证未发放完时,前来的线程都可以领到,当许可证发放完,再来的线程只能等之前的线程执行完成返还了许可证,这个期间它们只能阻塞等待(在Java信号量的实现是阻塞等待)。

计数信号量可以实现某种资源池,分配一个资源计数值减一,返回一个资源计数值加一;或者可以实现对容器施加边界(添加一个元素消耗一个计数值,删除一个元素返回一个计数值)。

Java中的信号量是java.util.concurrent.Semaphore类。

  • 使用信号量给容器加上边界:
public class BoundArrayList<T>{
     
    // ArrayList实际上如果不加限制,在加入新的元素时会自动扩容
    private final ArrayList<T> list;
    private final Semaphore sem;
    
    public BoundArrayList(int bound){
        this.list = =  new ArrayList<>(bound);
        sem = new Semaphore(bound);
    }

    public void add(T e) throws InterruptedException{
        sem.acquire();
        boolean isAdded = true;
        try{
            isAdded= this.list.add(e);
        }finally{
            // 如果add失败,就释放添加许可
            if(!isAdded)
                sem.release();
        }

    }
    public void remove(T e){
        boolean isRemoved = true;
        try{
            isRemoved = this.list.remove(e);
        }finally{
            if(isRemoved)
                sem.release();
        }
    }
}
  • 使用锁来实现信号量
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 使用锁来实现信号量Semaphore
 *
 */
public class SemaphoreWithLock {
    private final Lock lock = new ReentrantLock();
    private final Condition permitAvaliable = lock.newCondition();
    private volatile int permitCount;

    public SemaphoreWithLock(int count) {
        this.permitCount = count;
    }

    public void acquire() throws InterruptedException {
        lock.lock();
        try {
            if (permitCount > 0) {
                permitCount--;
            } else {
                while (permitCount <= 0)
                    permitAvaliable.await();
            }
        } finally {
            lock.unlock();
        }
    }

    public void release() {
        lock.lock();
        try {
            permitCount++;
            permitAvaliable.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

闭锁(CountDownLatch)

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

当闭锁减为0之后,在闭锁上阻塞的线程全部被唤醒,此时一个闭锁就失效了,不可循环使用。如果需要再次使用,可以使用CyclicBarrier.

  • CountLatch sample:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

/**
 * CountDownLatch是一个倒数闭锁,在此latch上await的线程,会在计数值降为0的时候被唤醒,其他时候阻塞。
 */
public class CountDownLatchTest {
    /**
     * 模拟join
     * 
     * @throws InterruptedException
     */
    @Test
    public void simulateJoin() throws InterruptedException {
        CountDownLatch end = new CountDownLatch(1);
        new Thread(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " start!");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(threadName + " finish!");
            end.countDown();

        }).start();
        end.await();
        System.out.println("main end");
    }

    /**
          * 子线程等待main给出开始信号,main线程等待所有的子线程的结束。
     * @throws InterruptedException
     * 
     */
    @Test
    public void startWithSingal() throws InterruptedException {
        CountDownLatch start = new CountDownLatch(1);
        int threadCOunt = 100;
        CountDownLatch end = new CountDownLatch(threadCOunt);
        for (int i = 0; i < threadCOunt; i++) {
            new Thread(() -> {
                try {
                    String threadName = Thread.currentThread().getName();
                    System.out.println(threadName + " wait start!");
                    start.await();
                    System.out.println(threadName + " start.....end!");
                    end.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
        System.out.println("Main start count down");
        start.countDown();
        end.await();
        System.out.println("all end");

    }
}
  • 使用锁来实现闭锁
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CountDownLatchOnLock {

    private int count;
    private final Lock lock;
    private final Condition down2zero;

    public CountDownLatchOnLock(int count) {
        this.count = count;
        this.lock = new ReentrantLock();
        down2zero = lock.newCondition();
    }

    public void countDown() {
        lock.lock();
        try {
            if (--count <= 0) {
                down2zero.signalAll();
            }
        } finally {
            lock.unlock();
        }

    }

    public void await() throws InterruptedException {
        lock.lock();
        try {
            down2zero.await();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatchOnLock countdown = new CountDownLatchOnLock(10);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " start");
                countdown.countDown();
            }).start();
        }
        countdown.await();
        System.out.println("all end");

    }
}

栅栏(Barrier)或者屏障

栅栏类似闭锁,它能阻塞一组线程直到某个事件发生。

栅栏与闭锁的关键区别在于,所有线程必须到达栅栏的位置,才能继续执行。闭锁用于等待事件(计数值降为0的事件),而栅栏用于等待其他线程。

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

  • Test
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    @Test
    public void testCyclicBarrier() throws InterruptedException {
        int count = 5;
        CyclicBarrier barrier = new CyclicBarrier(count);
        CountDownLatch countDownLatch = new CountDownLatch(count);
        for (int i = 0; i < count; i++) {
            Runnable r = () -> {
                try {
                    String name = Thread.currentThread().getName();
                    System.out.println(name + " start wait");
                    barrier.await();
                    System.out.println(name + " finish wait");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            };
            new Thread(r).start();
        }
        // 等待子线程结束
        countDownLatch.await();
    }

    @Test
    public void testCyclicBarrierWithAction() throws InterruptedException {
        int count = 5;
        CyclicBarrier barrier = new CyclicBarrier(count, () -> {
            System.out.println("all arrive");
        });
        // ...和上面方法内容一致,区别在于所有线程到达await()处时,会触发action,打印“all arrive”。
    }

}

栅栏可以使用闭锁来实现,每个线程达到栅栏状态时CountDown,所有的线程到达栅栏即CountDown递减到了0,唤醒闭锁上的await线程。但是闭锁是一次性对象,栅栏可以重复利用。看一下栅栏时如何实现的。

栅栏是基于锁和条件对象,以及一个计数值,在执行await方法时,会判断计数值,如果非0,就阻塞在条件对象上,否则就唤醒该条件对象上的所有线程。

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        // Generation是为了reset服务的,内部只有一个isbroken状态。
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 如果设定了到达栅栏的触发动作
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                // 阻塞在条件对象上,调用await会释放锁
                    if (!timed)
                    
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

Barrier是在await()方法内判断count值来决定是否需要继续阻塞。类似把countDown的值放在了await逻辑中,当指定count的线程到达await()位置时,阻塞状态解除,除此之外,还可以配置阻塞解除时的action(Runnable对象)。且和CountDownLatch不同的是,countdownLatch是一次性的,barrier支持reset来复用。

FutureTask

FutureTask实现了Future的语义(基于回调的“未来式”),表示一直抽象的可生成结果的计算。FutureTask表示的计算是通过Callable实现的,相当于一种可生成结果的Runnable,并且可以处于以下三种状态:等待运行、正在运行和运行完成,运行完成即可以获取计算结果,否在获取结果的操作会阻塞。

FutureTask在任务执行框架(线程池)中代表一种异步任务。在单个线程中也可使用,作为一种异步任务。

A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the get methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using runAndReset()).

A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.

In addition to serving as a standalone class, this class provides protected functionality that may be useful when creating customized task classes.

  • Sample
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<Integer> task = () -> {
            Thread.sleep(1500);
            return new Random().nextInt(1000);
        };
        FutureTask<Integer> future = new FutureTask<>(task);
        new Thread(future).start();
        System.out.println(future.isDone());
        System.out.println("future result= " + future.get());
    }
}

实际上典型使用是Runnable的有返回结果的封装。

同步工具类实现原理

以上工具类,实际上都可以使用Lock来实现,且在底层,都是基于AbstractQueuedSynchroinzer类。

  • ReentrantLock && Semaphore && CountDownLatch

查看源码不难发现,它们同步机制的实现,是委托给这个内部类实现的:

 abstract static class Sync extends AbstractQueuedSynchronizer {
     // ...
 }
  • CyclicBarrier

CyclicBarrier是基于ReentrantLock实现的。

所以这几个同步工具的基础,都是AQS。关于AQS的介绍,在笔记《14.AbstractQueuedSynchronizer源码剖析》中有详细说明,可以说AQS是Concurrent包的基石,而AQS又是站在CAS的基础上的同步工具基类。

  • FutureTask

FutureTask没有使用AQS作为委托来实现阻塞和触发,它内部通过run方法的运行结束(正常或异常)来维护一个state值,并且会唤醒阻塞的线程,get方法判断state值来决定是阻塞还是返回值。
部分FutureTask源码:

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    
    // 阻塞直到任务结束
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    // 任务
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                // 任务执行完成,set方法会修改状态以及唤醒线程
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

参考资料

[1] Java并发编程实战

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容

  • 过往如云烟,消散于轮回中,残缺的记忆,也无法找回。 有时候,还是会努力回忆过往,但都会被眼泪阻拦,人世间除了情情爱...
    山枳阅读 250评论 0 0
  • 不可否认,或多或少的,我们都在羡慕着别人。 小时候,别人一块好看的橡皮擦就会让我们羡慕不已,求着父母给自己买;别人...
    卿山啊阅读 1,084评论 2 1
  • 在现在的大数据年代,大企业也抱团的年代,作为我们业务来说,创业者来说,是一个很冷很冷的冬天。因为大企业,利用大数据...
    善谋传媒阅读 243评论 0 0
  • 初夏的时节,由于加班,走出公司已是夜色笼罩四周。在人行道上急促地向公车站行进,而路的两侧,各色店铺从透明的门窗望进...
    幸好遇见你阅读 135评论 0 0
  • list.h list.cpp
    WalkZeRo阅读 1,229评论 0 0