生产者消费者模式
我们先来看看什么是生产者消费者模式,生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。在现实世界中,我们把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,俗称“产能过剩”,又或是多个生产者对应多个消费者时,大家可能会手忙脚乱。如何才能让大家更好地配合呢?这时在生产者和消费者之间就需要一个中介来进行调度,于是便诞生了生产者消费者模式。
使用生产者消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,有了媒介之后就相当于有了一个缓冲,平衡了两者的能力,整体的设计如图所示,最上面是阻塞队列,右侧的 1 是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,左侧的 2 是消费者线程,消费者获取阻塞队列中的数据。而中间的 3 和 4 分别代表生产者消费者之间互相通信的过程,因为无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就需要在合适的时机去唤醒被阻塞的线程。
那么什么时候阻塞线程需要被唤醒呢?有两种情况。第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产,这便是对生产者消费者模式的简单介绍。
BlockingQueue 实现生产者消费者模式
package com.example.multithread.ProducerAndConsumer.GroupA;
import java.util.concurrent.BlockingQueue;
/**
* @author liujy
* @description 生产者A
* @since 2020-12-29 10:18
*/
public class AProducer implements Runnable {
private BlockingQueue<Object> blockingQueue;
public AProducer(BlockingQueue<Object> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true) {
try {
Object o = new Object();
blockingQueue.put(o);
System.out.println("produce:" + o);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupA;
import java.util.concurrent.BlockingQueue;
/**
* @author liujy
* @description 消费者A
* @since 2020-12-29 10:18
*/
public class AConsumer implements Runnable {
private BlockingQueue<Object> blockingQueue;
public AConsumer(BlockingQueue<Object> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true) {
try {
Object take = blockingQueue.take();
System.out.println("consume:" + take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupA;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author liujy
* @description 用BlockingQueue实现生产者消费者模式
* @since 2020-12-29 09:52
*/
public class ATest {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);
new Thread(new AProducer(blockingQueue)).start();
new Thread(new AProducer(blockingQueue)).start();
new Thread(new AConsumer(blockingQueue)).start();
new Thread(new AConsumer(blockingQueue)).start();
}
}
Condition 实现生产者消费者模式
BlockingQueue 实现生产者消费者模式看似简单,背后却暗藏玄机,我们在掌握这种方法的基础上仍需要掌握更复杂的实现方法。我们接下来看如何在掌握了 BlockingQueue 的基础上利用 Condition 实现生产者消费者模式,它们背后的实现原理非常相似,相当于我们自己实现一个简易版的 BlockingQueue:
package com.example.multithread.ProducerAndConsumer.GroupB;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author liujy
* @description 自定义BlockingQueue
* @since 2020-12-29 14:41
*/
public class MyBlockingQueueForCondition {
private Queue queue;
private int maxCapacity;
private ReentrantLock lock = new ReentrantLock();
private Condition isEmpty = lock.newCondition();
private Condition isFull = lock.newCondition();
public MyBlockingQueueForCondition(int maxCapacity) {
this.maxCapacity = maxCapacity;
this.queue = new LinkedList();
}
// 添加元素
public void put(int o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == maxCapacity) {
isFull.await();
}
queue.add(o);
isFull.signalAll();
} finally {
lock.unlock();
}
}
// 移除元素
public Object pop() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
isEmpty.await();
}
Object o = queue.poll();
isEmpty.signalAll();
return o;
} finally {
lock.unlock();
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupB;
/**
* @author liujy
* @description 生产者B
* @since 2020-12-29 14:37
*/
public class BProducer implements Runnable {
private MyBlockingQueueForCondition myBlockingQueue;
public BProducer(MyBlockingQueueForCondition myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
myBlockingQueue.put(i);
System.out.println(Thread.currentThread().getName() + " " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupB;
/**
* @author liujy
* @description 消费者B
* @since 2020-12-29 14:37
*/
public class BConsumer implements Runnable {
private MyBlockingQueueForCondition myBlockingQueue;
public BConsumer(MyBlockingQueueForCondition myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Object pop = myBlockingQueue.pop();
System.out.println(Thread.currentThread().getName() + " " + pop);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupB;
/**
* @author liujy
* @description 用Condition实现生产者消费者模式
* @since 2020-12-29 14:38
*/
public class BTest {
public static void main(String[] args) {
MyBlockingQueueForCondition myBlockingQueue = new MyBlockingQueueForCondition(10);
new Thread(new BProducer(myBlockingQueue), "producer one").start();
new Thread(new BConsumer(myBlockingQueue), "consumer one").start();
// new Thread(new BProducer(myBlockingQueue), "producer two").start();
// new Thread(new BConsumer(myBlockingQueue), "consumer two").start();
}
}
wait/notify 实现生产者消费者模式
wait/notify 实现生产者消费者模式的方法,实际上实现原理和Condition 是非常类似的,它们是兄弟关系:
package com.example.multithread.ProducerAndConsumer.GroupC;
import java.util.LinkedList;
import java.util.Queue;
/**
* @author liujy
* @description 自定义BlockingQueue
* @since 2020-12-29 16:32
*/
public class MyBlockingQueue {
private Queue queue;
private int capacity;
public MyBlockingQueue(int capacity) {
this.queue = new LinkedList();
this.capacity = capacity;
}
public synchronized void put(int i) throws InterruptedException {
while (queue.size() == capacity) {
wait();
}
queue.add(i);
notifyAll();
}
public synchronized int pop() throws InterruptedException {
while (queue.size() == 0) {
wait();
}
Integer i = (Integer) queue.poll();
notifyAll();
return i;
}
}
package com.example.multithread.ProducerAndConsumer.GroupC;
/**
* @author liujy
* @description 生产者C
* @since 2020-12-29 16:29
*/
public class CProducer implements Runnable {
private MyBlockingQueue myBlockingQueue;
public CProducer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
myBlockingQueue.put(i);
System.out.println("producer " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupC;
/**
* @author liujy
* @description 消费者C
* @since 2020-12-29 16:29
*/
public class CConsumer implements Runnable {
private MyBlockingQueue myBlockingQueue;
public CConsumer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
int pop = myBlockingQueue.pop();
System.out.println("consumer " + pop);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupC;
/**
* @author liujy
* @description 用wait、notify实现生产者消费者模式
* @since 2020-12-29 16:29
*/
public class CTest {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
new Thread(new CProducer(myBlockingQueue), "producer one").start();
new Thread(new CConsumer(myBlockingQueue), "consumer one").start();
// new Thread(new BProducer(myBlockingQueue), "producer two").start();
// new Thread(new BConsumer(myBlockingQueue), "consumer two").start();
}
}