LinkedBlockingQueue基于链表实现,未指定容量时默认容量为Integer.MAX_VALUE,即无界阻塞队列,节点动态创建,节点出队后可被GC,伸缩性较好;如果消费者速度慢于生产者速度,可能造成内存空间不足,建议手动设置队列大小。
采用“two lock queue”算法变体,双锁(ReentrantLock):takeLock、putLock,允许读写并行,remove(e)和迭代器iterators需要获取2个锁。
LinkedBlockingQueue同步机制:
ArrayBlockingQueue底层基于数组,创建时必须指定队列大小,节点数量一开始就固定,“有界”
ArrayBlockingQueue入队和出队使用同一个lock(但数据读写操作已非常简洁),读取和写入操作无法并行。
ArrayBlockingQueue 同步机制:
LinkedBlockingQueue使用双锁可并行读写,其吞吐量更高。
ArrayBlockingQueue在插入或删除元素时直接放入数组指定位置(putIndex、takeIndex),不会产生或销毁任何额外的对象实例;而LinkedBlockingQueue则会生成一个额外的Node对象,在高效并发处理大量数据时,对GC的影响存在一定的区别。
在大部分并发场景下,LinkedBlockingQueue的吞吐量比ArrayBlockingQueue更好。
// linkedblockingqueue.java
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 这里为什么是 -1 这就是个标识成功、失败的标志而已。
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 必须要获取到 putLock 才可以进行插入操作
putLock.lockInterruptibly();
try {
// 如果队列满,等待 notFull 的条件满足。
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// count 原子加 1,c 还是加 1 前的值
c = count.getAndIncrement();
// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。
if (c + 1 < capacity)
notFull.signal();
} finally {
// 入队后,释放掉 putLock
putLock.unlock();
}
// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
if (c == 0)
signalNotEmpty();
}
// linkedblockingqueue.java
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 首先,需要获取到 takeLock 才能进行出队操作
takeLock.lockInterruptibly();
try {
// 如果队列为空,等待 notEmpty 这个条件满足再继续执行
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// count 进行原子减 1
c = count.getAndDecrement();
// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程
if (c > 1)
notEmpty.signal();
} finally {
// 出队后释放掉 takeLock
takeLock.unlock();
}
// 如果 c == capacity,那么说明在这个 take 方法发生的时候,队列是满的
// 既然出队了一个,那么意味着队列不满了,唤醒写线程去写
if (c == capacity)
signalNotFull();
return x;
}
两种不同的生产者消费者协调策略:
在ArrayBlockingQueue中,速率的调控是通过生产者唤醒消费者,消费者唤醒生产者互相作用来实现的调控。
由于signal是要先获取到锁才能调用的,在put里是获取不到take锁的,只能在自己的方法里去signal自己的condition队列
linkedblockingqueue出于性能考虑用了两个锁,尽量让两边各自独立,所以在LinkedBlockingQueue中,是生产者在队列未满的情况下唤醒生产者,
也就是finally之前的 if (c + 1 < capacity) notFull.signal();,消费者在队列不为空的时候唤醒消费者,对应的是if (c > 1) notEmpty.signal(); 但是存在两种特殊情况: 1 假设队列满了,生产者可能全部处于await状态,那么此时就需要消费者出队后唤醒生产者。也就是take操作return之前的signalNotFull()
假设队列为空,消费者可能全部处于await状态,那么此时就需要生产者生产之后唤醒消费者,也就是put操作return之前的signalNotEmpty()