高并发之并发容器详解(从入门到超神)

原文链接:blog.ouyangsihai.cn >> 高并发之并发容器详解(从入门到超神)

一、ConcurrentHashMap

在上面已经提到过ConcurrentHashMapConcurrentHashMap相比Hashtable能够进一步提高并发性,其原理图如下:

ConcurrentHashMap原理

HashMap,Hashtable与ConcurrentHashMap都是实现的哈希表数据结构,在随机读取的时候效率很高。Hashtable实现同步是利用synchronized关键字进行锁定的,其是针对整张哈希表进行锁定的,即每次锁住整张表让线程独占,在线程安全的背后是巨大的浪费。ConcurrentHashMap和Hashtable主要区别就是围绕着锁的粒度进行区别以及如何区锁定。

上图中,左边是Hashtable的实现方式,可以看到锁住整个哈希表;而右边则是ConcurrentHashMap的实现方式,单独锁住每一个桶(segment).ConcurrentHashMap将哈希表分为16个桶(默认值),诸如get(),put(),remove()等常用操作只锁当前需要用到的桶,而size()才锁定整张表。原来只能一个线程进入,现在却能同时接受16个写线程并发进入(写线程需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的。

而在迭代时,ConcurrentHashMap使用了不同于传统集合的快速失败迭代器(fast-fail iterator)的另一种迭代方式,称为弱一致迭代器。在这种迭代方式中,当iterator被创建后集合再发生改变就不再是抛出ConcurrentModificationException,取而代之的是在改变时实例化出新的数据从而不影响原有的数据,iterator完成后再将头指针替换为新的数据,这样iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。

我们在上面阐述了ConcurrentHashMap的使用特点和原理,分别在同样的一个高并发场景下,测试不同的方式产生的延时(ms):

Map<String, String> map = new ConcurrentHashMap<>();//483
Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序 559
Map<String, String> map = new Hashtable<>(); //499 
Map<String, String> map =Collections.synchronizedMap(new HashMap<>()); // 530
Map<String, String> map =Collections.synchronizedMap(new TreeMap()); //905

以ConcurrentLinkedQueue为例,他实现了Queue接口,实例化方式如下:

Queue<String> strs = new ConcurrentLinkedQueue<>();

添加元素的方法:offer()
取出队头的方法:poll()
判断队列长度:size()
对于双端队列,使用ConcurrentLinkedDeque类型来实现.

下面我们再看一个具体的实例:

public class T01_ConcurrentMap {

    public static void main(String[] args) {
        Map<String, String> map = new ConcurrentHashMap<String, String>();
        //Map<String, String> map = new ConcurrentSkipListMap<String, String>(); //高并发并且排序

        //Map<String, String> map = new Hashtable<>();
        //Map<String, String> map = new HashMap<String, String>();

        Random random = new Random();
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(()->{
                for(int j=0; j<10000;j++) map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
                latch.countDown();
            });
        }

        Arrays.asList(threads).forEach(t->t.start());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long end = System.currentTimeMillis();
        System.out.println(end-start);
    }
}

启动100个线程,向图中添加100000个元素,分别使用Hashtable,HashMap,ConcurrentHashMap,ConcurrentSkipListMap定义map,判断程序完成的时间。最终发现,ConcurrentHashMap要比HashMap效率高,ConcurrentHashMap是将大锁分成若干小锁,实现多个线程共同运行,所以,效率有很大差距。ConcurrentSkipListMap较ConcurrentHashMap除了实现高并发外还能够排序。

参考:

http://blog.csdn.net/sunxianghuang/article/details/52221913
http://www.educity.cn/java/498061.html

二、ConcurrentQueue

与ConcurrentHashMap相同,ConcurrentQueue也是通过同样的方式来提高并发性能的。

我们在同步容器中提到过火车票问题:

有N张火车票,每张票都有一个编号,同时有10个窗口对外售票,写一个模拟程序。

在上述问题中,也可以使用ConcurrentQueue进一步提高并发性:

static Queue<String> tickets = new ConcurrentLinkedQueue<>();

具体的代码是这样的:

public class TicketSeller4 {
        static Queue<String> tickets = new ConcurrentLinkedQueue<>();
        static {
                for(int i=0; i<1000; i++) tickets.add("票编号:" + i);
        }
        public static void main(String[] args) {
                for(int i=0; i<10; i++) {
                        new Thread(()->{
                                while(true) {
                                        String s = tickets.poll();
                                        if(s == null) break;
                                        else System.out.println("销售了--" + s);
                                }
                        }).start();
                }
        }
}

这里面通过ConcurrentLinkedQueue的poll()方法来实现获取容器成员的。用这个类型可以进一步提高并发性。

具体基本操作实例

public class T04_ConcurrentQueue {

    public static void main(String[] args) {
        Queue<String> strings = new ConcurrentLinkedQueue<String>();

        for (int i = 0; i < 10; i++) {
            strings.offer("a" + i); //相当于add, 放进队列
        }

        System.out.println(strings);

        System.out.println(strings.size());

        System.out.println(strings.poll()); //取出并移除掉
        System.out.println(strings.size());

        System.out.println(strings.peek()); //取出,不会移除。相当于get(0)
        System.out.println(strings.size());
    }
}

三、CopyOnWriteArrayList

写时复制容器,即copy-on-write,在多线程环境下,写时效率低,读时效率高,适合写少读多的环境。对比测试几种情况:

List<String> lists = new ArrayList<>();
//这个会出并发问题!报错:ArrayIndexOutOfBoundsException
List<String> lists = new Vector();//111 ms
List<String> lists = new CopyOnWriteArrayList<>();//5230 ms
//测试核心代码:
Runnable task = new Runnable() {
@Override
public void run() {
     for(int i=0; i<1000; i++) lists.add("a" +     r.nextInt(10000));
}
};
//多线程向该容器中不断加入数据。

从JDK 5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayListCopyOnWriteArraySet

当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后向新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为在当前读的容器中不会添加任何元素。所以CopyOnWrite容器是一种读写分离的思想,读和写对应不同的容器。

四、BlockingQueue

这种并发容器,会自动实现阻塞式的生产者/消费者模式。使用队列解耦合,在实现异步事物的时候很有用。下面的例子,实现了阻塞队列:

LinkedBlockingQueue
static BlockingQueue<String> strs = new LinkedBlockingQueue<>(10);
strs.put("a" + i); //加入队列,如果满了,就会等待
strs.take(); //取出队列元素,如果空了,就会等待

在实例化时,可以指定具体的队列容量。
在加入成员的时候,除了使用put方法还可以使用其他方法:

Str.add(“aaa”);
/* add如果在队列满了之后,再加入成员会抛出异常,而这种情况下,put方法会一直等待被消费掉。
*/
Str.offer(“aaa”);
/* offer添加成员的时候,会有boolean类型的返回值,如果添加成功,会返回true,如果添加失败,会返回false.除此之外,offer还可以按时段进行添加,例如:
*/
strs.offer("aaa", 1, TimeUnit.SECONDS);
/*
如果队列满了,等待1秒,再进行成员的添加,如果添加失败了,则返回false.
*/

五、ArrayBlockingQueue

static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);

对象的方法和上面的BlockingQueue是一样的,用法也是一样的。

二者的区别主要是:

  1. LinkedBlockingQueue是一个单向链表实现的阻塞队列,在链表一头加入元素,如果队列满,就会阻塞,另一头取出元素,如果队列为空,就会阻塞。
  2. LinkedBlockingQueue内部使用ReentrantLock实现插入锁(putLock)和取出锁(takeLock)。

相比于数组实现的ArrayBlockingQueue的有界情况,我们称之为有界队列LinkedBlockingQueue可认为是无界队列。当然,也可以向上面那样指定队列容量,但是这个参数常常是省略的,多用于任务队列。

六、LinkedBlockingQueue实例

public class T05_LinkedBlockingQueue {

    private static BlockingQueue<String> strings = new LinkedBlockingQueue<String>();
    private static Random r = new Random();

    public static void main(String[] args) {
        new Thread(()->{
            for (int i = 0; i < 100; i++) {
                try {
                    strings.put("a" + i); //如果满了,就会等待
                    TimeUnit.SECONDS.sleep(r.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "p1").start();

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                for(;;){
                    try {
                        System.out.println(Thread.currentThread().getName() + "take -" + strings.take()); //如果空了,就会等待
                    } catch (Exception e) {
                        e.printStackTrace();
                    } 
                }
            },"c" + i).start();
        }
    }
}

LinkedBlockingQueue是使用链表是实现的阻塞式容器。

七、DelayQueue

DelayQueue也是一个BlockingQueue,其特化的参数是Delayed
Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay()的返回值应为固定值(final).DelayQueue内部是使用PriorityQueue实现的,即:

    DelayQueue = BlockingQueue + PriorityQueue + Delayed

可以说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。这是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。但是要注意的是,不能将null元素放置到这种队列中。
Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现类必须重写一个 compareTo() 方法,该方法提供与此接口的 getDelay()方法一致的排序。
DelayQueue存储的对象是实现了Delayed接口的对象,在这个对象中,需要重写compareTo()getDelay()方法,例如:

static class MyTask implements Delayed {
        long runningTime;
        MyTask(long rt) {
        this.runningTime = rt;
}
@Override
public int compareTo(Delayed o) {
        if(this.getDelay(TimeUnit.MILLISECONDS) <      o.getDelay(TimeUnit.MILLISECONDS))
        return -1;
       else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
        return 1;
else 
        return 0;
}
@Override
public long getDelay(TimeUnit unit) {
        return unit.convert(runningTime -   System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}

因此,当我们在main()函数中,向该队列加入元素后再取出元素的过程,就会存在延时,可以这样验证:

long now = System.currentTimeMillis();
MyTask t1 = new MyTask(now + 1000);
MyTask t2 = new MyTask(now + 2000);
MyTask t3 = new MyTask(now + 1500);
MyTask t4 = new MyTask(now + 2500);
MyTask t5 = new MyTask(now + 500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for(int i=0; i<5; i++) {
System.out.println(tasks.take());
}

注意:为了方便查看到效果,可以重写toString()函数,来保证打印出来的结果有意义:
例如:

@Override
public String toString() {
return "" + runningTime;
}

DelayQueue可以用在诸如用监控线程来轮询是否有超时任务出现,来处理某些具有等待时延的情况,这样,可以避免由于数量巨大造成的轮询效率差的问题。例如:

  1. 关闭空闲连接:服务器中,有很多客户端的连接,空闲一段时间之后需要关闭他们。
  2. 缓存:缓存中的对象,超过了空闲时间,需要从缓存中移出。
  3. 任务超时处理:在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

实例:

public class T07_DelayQueue {

    private static BlockingQueue<MyTask> tasks = new DelayQueue<>();
    private static Random r = new Random();

    static class MyTask implements Delayed{

        long runningTime;

        public MyTask(long rt) {
            this.runningTime = rt;
        }

        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
                return -1;
            }else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            }else {
                return 0;
            }
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public String toString() {
            return "" + runningTime;
        }

        public static void main(String[] args) throws InterruptedException {
            long now = System.currentTimeMillis();
            MyTask t1 = new MyTask(now + 1000);
            MyTask t2 = new MyTask(now + 2000);
            MyTask t3 = new MyTask(now + 1500);
            MyTask t4 = new MyTask(now + 2500);
            MyTask t5 = new MyTask(now + 500);

            tasks.put(t1);
            tasks.put(t2);
            tasks.put(t3);
            tasks.put(t4);
            tasks.put(t5);

            System.out.println(tasks);

            for (int i = 0; i < 5; i++) {
                System.out.println(tasks.take());
            }
        }
    }
}

八、LinkedTransferQueue

TransferQueue是一个继承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueue是TransferQueue接口的实现类,其定义为一个无界的队列,具有先进先出(FIFO)的特性。
TransferQueue接口含有下面几个重要方法:

  1. transfer(E e)
    若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。

  2. tryTransfer(E e)
    若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。

  3. tryTransfer(E e,long timeout,TimeUnit unit)
    若当前存在一个正在等待获取的消费者线程,会立即传输给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。

  4. hasWaitingConsumer()
    判断是否存在消费者线程。

  5. getWaitingConsumerCount()
    获取所有等待获取元素的消费线程数量。

  6. size()
    因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,无法保证原子性,可能会得到一个不太准确的结果,尤其是在遍历时有可能队列发生更改。

使用方法:

LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();//实例化

如果当前没有消费者线程(存在take方法的线程):

strs.transfer("aaa");

该方法会一直阻塞在这里,知道有消费者线程存在。
而如果使用传统的put()方法来加入元素的话,则不会发生阻塞现象。

strs.take()

同样,获取队列中元素的方法take()也是阻塞在这里等待获取新的元素的。

九、SynchronousQueue

SynchronousQueue也是一种BlockingQueue,是一种无缓冲的等待队列。所以,在某次添加元素后必须等待其他线程取走后才能继续添加;可以认为SynchronousQueue是一个缓存值为0的阻塞队列(也可以认为是1),它的isEmpty()方法永远返回是true,remainingCapacity()方法永远返回是0.
remove()removeAll()方法永远返回是false,iterator()方法永远返回空,peek()方法永远返回null.
在使用put()方法时,会一直阻塞在这里,等待被消费:

BlockingQueue strs = new SynchronousQueue<>();//实例化
strs.put(“aaa”); //阻塞等待消费者消费
strs.add(“aaa”);//会产生异常,提示队列满了
strs.take();//该方法可以取出元素,同样是阻塞的,需要在线程中去实现他,作为消费者.

实例:

public class T09_Synchronized {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> strings = new SynchronousQueue<String>();

        new Thread(()->{
            try {
                System.out.println(strings.take());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        strings.put("aaa"); //阻塞等待消费者消费
        //strings.add("aaa");
        System.out.println(strings.size());
    }
}
参考资料
https://blog.csdn.net/qq_34707744/article/details/79746622
https://blog.csdn.net/wang7807564/article/details/80048576
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容

  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 11,244评论 4 56
  • Java平台类库包含了丰富的并发基础构建模块,例如线程安全的容器类以及各种用于协调多个相互协作的线程控制流的同步工...
    Steven1997阅读 555评论 0 0
  • 学习山川网关于各个省份的GDP介绍,可以增广自己的知识面,在地理、文华、经济层面上了解不同地方的存在与发展,培养自...
    人言白一阅读 95评论 0 0
  • 女人成长记 下午去燕郊拜访了我的好友,好久没见,一见面依然感到很亲切,她又瘦了很多,而脸上洋溢着浓浓的自信满...
    素骊阅读 311评论 1 3
  • 近几年,随着智能手机的普及,健身软件,跑步软件,计步软件层出不穷,极大的调动了普通老百姓的运动积极性,每天看到微...
    归园田居其一阅读 377评论 0 1