Java中线程安全的容器主要包括两类:
-
Vector
、Hashtable
,以及封装器类Collections.synchronizedList
和Collections.synchronizedMap
; - Java 5.0引入的
java.util.concurrent
包,其中包含并发队列、并发HashMap以及写入时复制容器。
依笔者看,早期使用的同步容器主要有两方面的问题:1)通过对方法添加synchronized关键字实现同步,这种粗粒度的加锁操作在synchronized关键字本身未充分优化之前,效率偏低;2)同步容器虽然是线程安全的,但在某些外部复合操作(例:若没有则添加)时,依然需要客户端加锁保证数据安全。因此,从Java 5.0以后,并发编程偏向于使用java.util.concurrent
包(作者:Doug Lea)中的容器类,本文也将着重介绍该包中的容器类,主要包括:
- 阻塞队列
- ConcurrentHashMap
- 写入时复制容器
一、阻塞队列
在并发环境下,阻塞队列是常用的数据结构,它能确保数据高效安全的传输,为快速搭建高质量的多线程应用带来极大的便利,比如MQ的原理就是基于阻塞队列的。java.util.concurrent
中包含丰富的队列实现,它们之间的关系如下图所示:
- BlockingQueue、Deque(双向队列)继承自Queue接口;
- BlockingDeque同时继承自BlockingQueue、Deque接口,提供阻塞的双向队列属性;
- LinkedBlockingQueue和LinkedBlockingDeque分别实现了BlockingQueue和BlockingDeque接口;
- DelayQueue实现了BlockingQueue接口,提供任务延迟功能;
- TransferQueue是Java 7引入的,用于替代BlockingQueue,LinkedTransferQueue是其实现类。
下面对这些队列进行详细的介绍:
1.1 BlockingQueue与BlockingDeque
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空。
- 当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
- 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
- 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
- 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
BlockingDeque在BlockingQueue的基础上,增加了支持双向队列的属性。如下图所示,相比于BlockingQueue的插入和移除方法,变为XxxFirst
,XxxLast
方法,分别对应队列的两端,既可以在头部添加或移除,也可以在尾部添加或移除。
1.2 LinkedBlockingQueue与LinkedBlockingDeque
LinkedBlockingQueue
是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE
,按照先进先出的原则对元素进行排序。
首先看下LinkedBlockingQueue
中核心的域:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
-
LinkedBlockingQueue
和LinkedList
类似,通过静态内部类Node<E>
进行元素的存储; -
capacity
表示阻塞队列所能存储的最大容量,在创建时可以手动指定最大容量,默认的最大容量为Integer.MAX_VALUE
; -
count
表示当前队列中的元素数量,LinkedBlockingQueue
的入队列和出队列使用了两个不同的lock对象,因此无论是在入队列还是出队列,都会涉及对元素数量的并发修改,因此这里使用了一个原子操作类来解决对同一个变量进行并发修改的线程安全问题。 -
head
和last
分别表示链表的头部和尾部; -
takeLock
表示元素出队列时线程所获取的锁,当执行take
、poll
等操作时线程获取;notEmpty
当队列为空时,通过该Condition
让获取元素的线程处于等待状态; -
putLock
表示元素入队列时线程所获取的锁,当执行put
、offer
等操作时获取;notFull
当队列容量达到capacity
时,通过该Condition
让加入元素的线程处于等待状态。
其次,LinkedBlockingQueue
有三个构造方法,分别如下:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
默认构造函数直接调用LinkedBlockingQueue(int capacity)
,LinkedBlockingQueue(int capacity)
会初始化首尾节点,并置位null。LinkedBlockingQueue(Collection<? extends E> c)
在初始化队列的同时,将一个集合的全部元素加入队列。
最后分析下put
和take
的过程,这里重点关注:LinkedBlockingQueue
如何实现添加/移除并行的?
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
之所以把put
和take
放在一起,是因为它们是一对互逆的过程:
-
put
在插入元素前首先获得putLock
和当前队列的元素数量,take
在去除元素前首先获得takeLock
和当前队列的元素数量; -
put
时需要判断当前队列是否已满,已满时当前线程进行等待,take
时需要判断队列是否已空,队列为空时当前线程进行等待; -
put
调用enqueue
在队尾插入元素,并修改尾指针,take
调用dequeue
将head
指向原来first
的位置,并将first的数据域置位null,实现删除原first
指针,并产生新的head
,同时,切断原head
节点的引用,便于垃圾回收。
private void enqueue(Node<E> node) {
last = last.next = node;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
- 最后,
put
根据count
决定是否触发队列未满和队列空;take
根据count
决定是否触发队列未空和队列满。
回到刚才的问题:LinkedBlockingQueue
如何实现添加/移除并行的?
LinkedBlockingQueue
在入队列和出队列时使用的是不同的Lock,这也意味着它们之间的操作不会存在互斥。在多个CPU的情况下,可以做到在同一时刻既消费、又生产,做到并行处理。
同样的,LinkedBlockingDeque
在LinkedBlockingQueue
的基础上,增加了双向操作的属性。继续以put
和take
为例,LinkedBlockingDeque
增加了putFirst
/putLast
、takeFirst
/takeLast
方法,分别用于在队列头、尾进行添加和删除。与LinkedBlockingQueue
不同的是,LinkedBlockingDeque
的入队列和出队列不再使用不同的Lock。
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
其中,lock表示读写的主锁,notEmpty和notFull依然表示相应的控制线程状态条件量。以putFirst
和takeFirst
为例:
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
putFirst
不支持插入null元素,首先新建一个Node
对象,然后调用ReentrantLock
的lock
方法获取锁,插入操作通过boolean linkFirst(Node<E> node)
实现,如果当前队列头已满,那么该线程等待(linkFirst
方法在写入元素成功后会释放该锁信号),最后,在finally块中释放锁(ReentrantLock
的使用)。
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
与putFirst
类似,takeFirst
首先获取锁,然后在try中解除尾元素对象的引用,如果unlinkFirst
为空,表示队列为空,没有元素可删,那么该线程等待。同样,最后在finally块中释放锁。
那么问题来了,LinkedBlockingDeque
为什么不使用LinkedBlockingQueue
读写锁分离的方式呢?LinkedBlockingDeque
与LinkedBlockingQueue
的使用场景有什么区别呢?
1.3 DelayQueue
DelayQueue
主要用于实现延时任务,比如:等待一段时间之后关闭连接,缓存对象过期删除,任务超时处理等等,这些任务的共同特点是等待一段时间之后执行(类似于TimerTask)。DelayQueue
的实现包括三个核心特征:
- 延时任务:
DelayQueue
的泛型类需要继承自Delayed
接口,而Delayed
接口继承自Comparable<Delayed>
,用于队列中优先排序的比较; - 优先队列:
DelayQueue
的实现采用了优先队列PriorityQueue
,即延迟时间越短的任务越优先(回忆下优先队列中二叉堆的实现)。 - 阻塞队列:支持并发读写,采用
ReentrantLock
来实现读写的锁操作。
因此,DelayQueue
= Delayed
+ PriorityQueue
+ BlockingQueue
。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
}
接下来看下DelayQueue
的读写操作如何实现延时任务的?
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
首先执行加锁操作,然后往优先队列中插入元素e,优先队列会调用泛型E的compareTo
方法进行比较(具体关于二叉堆的操作,这里不再赘述,请参考数据结构部分相关分析),将延迟时间最短的任务添加到队头。最后检查下元素是否为队头,如果是队头的话,设置leader为空,唤醒所有等待的队列,释放锁。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
- 首先执行加锁操作,然后取出优先队列的队头,如果对头为空,则该线程阻塞;
- 获得对头元素的延迟时间,如果延迟时间小于等于0,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素;
- 在延迟时间大于0时,首先释放元素first的引用(避免内存泄露),其次判断如果leader线程不为空,则该线程阻塞(表示已有线程在等待)。否则,把当前线程赋值给leader元素,然后阻塞delay的时间,即等待队头到达延迟时间,在finally块中释放leader元素的引用。循环后,取出对头元素,退出for循环。
- 最后,如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程,并执行解锁操作。
1.4 TransferQueue与LinkedTransferQueue
TransferQueue
是一个继承了BlockingQueue
的接口,并且增加若干新的方法。LinkedTransferQueue
是TransferQueue
接口的实现类,其定义为一个无界的队列,具有先进先出(FIFO)的特性。
TransferQueue
接口主要包含以下方法:
public interface TransferQueue<E> extends BlockingQueue<E> {
boolean tryTransfer(E e);
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
}
- transfer(E e):若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
- tryTransfer(E e):若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
- tryTransfer(E e, long timeout, TimeUnit unit):若当前存在一个正在等待获取的消费者线程,会立即传输给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
- hasWaitingConsumer():判断是否存在消费者线程。
- getWaitingConsumerCount():获取所有等待获取元素的消费线程数量。
LinkedTransferQueue
实现了上述方法,较之于LinkedBlockingQueue
在队列满时,入队操作会被阻塞的特性,LinkedTransferQueue
在队列不满时也可以阻塞,只要没有消费者使用元素。下面来看下LinkedTransferQueue
的入队和和出队操作:transfer
和take
方法。
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
LinkedTransferQueue
入队和和出队都使用了一个关键方法:
private E xfer(E e, boolean haveData, int how, long nanos) {}
其中,E
表示被操作的元素,haveData
为true
表示添加数据,false
表示移除数据;how
有四种取值:NOW
, ASYNC
, SYNC
, 或者TIMED
,分别表示执行的时机;nanos
表示how
为TIMED
时的时间限制。
(xfer
方法具体流程较为复杂,这里不再展开。另外,LinkedTransferQueue
采用了CAS非阻塞同步机制,后面会具体讲到)