JAVA 并发容器和阻塞队列
JAVA 并发容器
ConcurrentHashMapjdk7 vs jdk8 异同和优缺点
-
数据结构
JDK7 采用segment 分段锁的思想 ,jdk8 中是使用 数组+链表+红黑树 实现。
image.png
-
并发度
JDK7 每个segment 独立加锁, 最大并发个数是 segment 个数,默认是16。
JDK8中 ,锁的粒度更细,理想情况下数组长度就是就是支持并发的最大个数,并发度提高很多。
-
保证并发安全的原理
JDK7 采用继承自 ReentrantLock 的segment 分段锁的思想,来保证线程安全。
JDK8 放弃了Segment 的设计, 采用 node+ CAS + synchronizad 保证线程安全。
-
查询时间复杂度
JDK7 遍历链表的事件复杂度是 O(n), 【n 为链表的长度】。
JDK8 如果变成遍历红黑树,那么时间复杂度为 O(logn) 【n 为红黑树节点个数】。
ConcurrentHashMap 与 HashTable 的区别
-
出现的版本不同
HashTable 是在 JDK1.0 的时候存在,在JDK1.2 的版本实现了Map 接口,成为了集合框架中的一员。
ConcurrentHashMap 则是在JDK1.5 中出现的。出现的年代不同,所以在实现方式和性能上都有很大的不同。
-
实现线程安全的方式不同
HashTable 中的每个方法都被 synchronized 关键字修饰了, 这也就保证了线程的安全。但是这种方式在高并发的情况下性能并不是非常良好的。
ConcurrentHashMap 是通过 node+CAS+synchronized 的方式实现的
-
性能不同
当线程数量急剧增多的时候,因为每一次修改都需要锁住整个对象,其他线程再此时是不能操作的,导致性能会急剧下降,不仅如此还会带来额外的上下文切换开销, 所以它的吞吐量甚至没有单线程的情况下要好。
ConcurrentHashMap 上锁也仅仅是**对一部分上锁 ** ,多线程的性能带来的是增幅的效果。
-
迭代时元素内容修改不同
HashTable 在迭代期间不能修改内容,否则会因为modCount != expectedModeCount 抛出 ConcurrentModificationException 异常, 而 ConcurrentHashMap 在迭代期间修改内容**也不会抛出ConcurrentModificationException **异常。
有关Map 的一些问题
为什么Map 桶中超过8 个才转为红黑树?
- 采用拉链法在相同的hash 位的值, 当链表的长度大于等于阈值 (TREEIFY_THRESHOLD=8) 同时还满足容量大于等于( MIN_TREEIFY_CAPACITY = 64) ** 的要求, 就会把链表转化为红黑树, 后续如果链表的容量小于等于(UNTREEIFY_THRESHOLD = 6)时,又会恢复为链表**的形式。
为什么不一开始就采用红黑树的结构呢?
- 单个TreeNode 节点占用的空间大约是普通node 的两倍, 当node 足够多时才转化为 TreeNode,
- 如果hashCode 分布良好,hash 计算离散性比较好的情况下,那么红黑树这种数据结构是很少会用到的,在理想情况下,链表的长度符合泊松分布,各个链表长度的命中概率依次递减,当长度为8 时 概率仅仅为 0.00000006是一个非常小的概率,,通常情况下并不会发生链表向红黑树的转换。
- 如果发现 hashMap 或 ConcurrrentHashMap 内部出现了 红黑树的结构,这个时候说明我们的hash算法出现了问题,(hashCode 的实现逻辑)
CopyOnWriteList 有什么特点?
适用场景
- 读操作可以尽可能的快,而写操作即使慢一下也没有关系。
- 读多写少
读写规则
-
读写锁的思想
读写锁的思想是 读读共享 ,其他都是互斥的(读写,写写,写读),并发读并不会修改数据,不会带来线程安全问题,写操作会修改原来的数据,因此写操作的时候,不允许读线程和写线程的加入。
-
对读写锁规则的升级
copyOnWriteArrayList 读取是不用加锁的,,写入也不会阻塞读取操作,即 写读不互斥。但是写写是不能线程共享的,写写互斥
特点
- 当容器内的内容被个修改时,copy 出一个新的容器, 然后在新的容器中进行操作,修改完成 再将原来容器的引用指向新的容器
- 迭代期间允许修改集合内容
缺点
- 内存占用问题
- 在元素较多或者复杂的情况下。复制的开销很大
- 数据一致性问题
阻塞队列与非阻塞队列
常见的阻塞队列
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
- PriorityBlockingQueue
- DelayQueue
ArrayBlockingQueue
-
是典型的 有界队列 ,其内部是用 数组 结构存储元素的, 利用 ReentrantLock 实现线程安全。内部排序方式是 先进先出(FIFO)的。
/** The queued items */ final Object[] items; /** Main lock guarding all access */ final ReentrantLock lock;
/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. This method is generally preferable to method {@link #add}, * which can fail to insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
-
在创建时 指定容量和排队任务是否公平 , (通常非公平的吞吐量要优于公平的场景)
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
LinkedBlockingQueue
-
是一个内部用 链表 实现的BlockingQueue ,如果不指定 初始容量,那么他的默认容量为 Integer.MAX_VALUE。 会成为一个无界队列, 内部排序方式是 先进先出(FIFO)的。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
-
同样内部也是通过ReentrantLock 来保证线程安全。
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
SynchronousQueue
-
最大的不同之处在于 它的容量是 0 ,因此每次取数据都要先阻塞,直到有数据被放入,每次放入数据也会阻塞,直到有消费者来取出。
/** * Inserts the specified element into this queue, waiting if necessary * up to the specified wait time for another thread to receive it. * * @return {@code true} if successful, or {@code false} if the * specified waiting time elapses before a consumer appears * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
他会把数据直接从生产者传给消费者(direct handoff), 不需要存储。
PriorityBlockingQueue
-
是一个支持 优先级的无界阻塞队列 ,可以通过自定义类实现 compareTo() 方法,或初始化是通过构造函数传入Comparator 来指定元素排序规则。
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
-
他的默认初始化容量为 11,最大容量是 Integer.MAX_VALUE - 8 内部是 用 **数组 **的数据结构存储元素,(transient : 阻止属性被序列化,尽在内存中,不会序列化到磁盘)
/** * Default array capacity. */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * The maximum size of array to allocate. * Some VMs reserve some header words in an array. * Attempts to allocate larger arrays may result in * OutOfMemoryError: Requested array size exceeds VM limit */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private transient Object[] queue;
插入队列的对象必须是可以比较大小的, 否则会抛出 ClassCastException。
-
消费数据在队列为空的是时候会阻塞 (take, pool),放入数据的时候不会被阻塞(因为队列是无界的)。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; }
DelayQueue
- 是一个具有延迟功能的队列, 可以设定让队列中的任务延迟多久后再执行。
- 它是无界队列,放入的元素必须实现了 Delayed 接口。(Delayed 继承了 Comparable 接口, 拥有了比较和排序)
- 元素会根据延迟时间的长短被放到队列的不同位置, 越靠近队列头则早过期。
- DelayQueue 内部使用了 PriorityQueue 的能力来排序。
阻塞队列常用的方法
操作失败抛出异常
方法 | 含义 | 特点 |
---|---|---|
add | 添加一个元素 | 如果队列满了,抛出IllegalStateException |
remove | 返回并删除队列的头元素 | 如果队列为空,抛出NoSuchElementException |
element | 返回队列的头元素 | 如果队列为空,抛出NoSuchElementException |
返回操作结果,不抛出异常
方法 | 含义 | 特点 |
---|---|---|
offer | 添加一个元素 | 队列满,返回false ,添加成功 返回true |
poll | 返回并删除队列的头元素 | 队列空,删除失败,返回null |
peeK | 返回队列的头元素 | 队列空,删除失败,返回null |
陷入阻塞
方法 | 含义 | 特点 |
---|---|---|
put | 添加一个元素 | 如果队列满,则阻塞,直至队列有空闲 |
take | 返回并删除队列的头元素 | 如果队列空,则阻塞。直至队列有元素 |
非阻塞队列
ConCurrentLinkedQueue
- 使用CAS 非阻塞算法 + 重试 来实现线程安全。
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}