使用多个线程除了并发进行提高一个大任务或者多个子任务的执行效率,多线程之间还存在协作完成任务,共同协作完成任务涉及到多线程之间的通讯。生产者消费者模型就是典型的多线程协作的应用
是什么:
什么是线程间的协作
- 多线程协作即多线程之间通过多线程通讯技术,实现多线程按照特定顺序共同协作完成一项任务。
怎么用:
多线程协作用在哪
- 多线程协作场景有多种使用场景,典型的有线程通过通知机制交替进行的生产者消费者模型:生产者生产物品,生产完通知消费者进行消费,消费者收到通知后进行消费,消费完再通知生产者进行生产;
- 多个线程共同等待一齐完成后集体进行下一步,以此循环共同推进的循环路障模型
- 某个线程任务等待多个线程都就绪后才开始的倒计时门闸模型
用什么实现生产者消费者模型
- 使用objcet的自带的wait,notify,notifyAll方法进行线程间的通知等待机制,需要注意,这几个方法必须在synchronized的同步代码块的范围内使用,否则会抛异常。
public class ProduceConsumeDemo {
public static void main(String[] args) {
List<String> container=new ArrayList<>();
ExecutorService executorService= Executors.newFixedThreadPool(2);
executorService.submit(new Consumer(container));
executorService.submit(new Producer(container));
executorService.shutdown();
}
}
@Slf4j
class Consumer implements Runnable{
int money=0;
private List<String> container;
public Consumer(List<String> container){
this.container=container;
}
void consumeFood() throws Exception{
while (money<15){
synchronized (container){
log.debug("container owner is consumer,money is {}",money);
if(container.size()==0){
container.wait();
}
Iterator it=container.iterator();
while(it.hasNext()){
money++;
log.debug("wow,the {} is delicious!nice food!",it.next());
it.remove();
}
log.debug(" container is empty,it is time to cook!");
container.notifyAll();
log.debug("consume notified continue");
}
}
log.debug("consumer money cost =15,over");
}
@Override
public void run() {
try {
consumeFood();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Slf4j
class Producer implements Runnable{
private String[] dishes={"rice","beef","chicken","tomato","potato"};
int money=0;
private Random random=new Random();
private List<String> container;
public Producer(List<String> container){
this.container=container;
}
void produceFood() throws Exception{
while (money<15){
synchronized (container){
log.debug("container owner is produce,money is {}",money);
if(container.size()>=5){
container.wait();
}
while(container.size()<5){
String dish=dishes[random.nextInt(5)];
log.debug("new food is cooked,it is {}",dish);
container.add(dish);
money++;
}
log.debug(" container is full,it is time to eat");
// 做完后唤醒其他
container.notifyAll();
log.debug("produce notified continue");
}
}
log.debug("producer money cost =15,over");
}
@Override
public void run() {
try {
produceFood();
} catch (Exception e) {
e.printStackTrace();
}
}
}
打印如下:
19:54:05.007 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - container owner is consumer,money is 0
19:54:05.011 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - container owner is produce,money is 0
19:54:05.011 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is rice
19:54:05.011 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is beef
19:54:05.011 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is potato
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is potato
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is tomato
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - container is full,it is time to eat
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - produce notified continue
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - container owner is produce,money is 5
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the rice is delicious!nice food!
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the beef is delicious!nice food!
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the potato is delicious!nice food!
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the potato is delicious!nice food!
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the tomato is delicious!nice food!
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - container is empty,it is time to cook!
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - consume notified continue
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - container owner is consumer,money is 5
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is tomato
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is chicken
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is chicken
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is tomato
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is rice
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - container is full,it is time to eat
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - produce notified continue
19:54:05.012 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - container owner is produce,money is 10
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the tomato is delicious!nice food!
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the chicken is delicious!nice food!
19:54:05.012 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the chicken is delicious!nice food!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the tomato is delicious!nice food!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the rice is delicious!nice food!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - container is empty,it is time to cook!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - consume notified continue
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - container owner is consumer,money is 10
19:54:05.013 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is tomato
19:54:05.013 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is beef
19:54:05.013 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is rice
19:54:05.013 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is tomato
19:54:05.013 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - new food is cooked,it is tomato
19:54:05.013 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - container is full,it is time to eat
19:54:05.013 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - produce notified continue
19:54:05.013 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.Producer - producer money cost =15,over
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the tomato is delicious!nice food!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the beef is delicious!nice food!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the rice is delicious!nice food!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the tomato is delicious!nice food!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - wow,the tomato is delicious!nice food!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - container is empty,it is time to cook!
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - consume notified continue
19:54:05.013 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.Consumer - consumer money cost =15,over
- 使用并发队列BlockingQueue,BlockingQueue本身就是为生产者消费者模型设计的线程安全的并发容器,使用起来简单高效。
@Slf4j
public class ProduceConsumerQueueDemo {
public static void main(String[] args) throws Exception{
BlockingQueue<String> queue=new LinkedBlockingDeque<>(5);
Random random=new Random();
List<String> foods= Arrays.asList("beef","nice","meat","tomato","potato");
ExecutorService executorService= Executors.newCachedThreadPool();
CountDownLatch countDownLatch=new CountDownLatch(2);
executorService.execute(()->{
int i=0;
String food;
while(i<15){
try {
food=queue.poll(10L, TimeUnit.SECONDS);
log.debug("delicious food,i like the {},size is {}",food,queue.size());
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
log.debug("eat over!");
});
executorService.execute(()->{
int i=0;
String food;
while(i<15){
try {
food=foods.get(random.nextInt(5));
queue.offer(food,10L, TimeUnit.SECONDS);
log.debug("delicious food is cooked {},queue size is {}",food,queue.size());
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
log.debug("cook over!");
});
countDownLatch.await();
log.debug("main over!");
executorService.shutdown();
}
}
打印结果如下:
20:24:04.513 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked tomato,queue size is 1
20:24:04.513 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the tomato,size is 0
20:24:04.516 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the beef,size is 0
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked beef,queue size is 1
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked nice,queue size is 1
20:24:04.516 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the nice,size is 0
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked beef,queue size is 1
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked beef,queue size is 1
20:24:04.516 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the beef,size is 0
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked tomato,queue size is 2
20:24:04.516 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the beef,size is 1
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked beef,queue size is 2
20:24:04.516 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the tomato,size is 1
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked beef,queue size is 2
20:24:04.516 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the beef,size is 1
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked tomato,queue size is 2
20:24:04.516 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the beef,size is 1
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked nice,queue size is 2
20:24:04.516 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the tomato,size is 1
20:24:04.516 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked nice,queue size is 2
20:24:04.517 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked nice,queue size is 2
20:24:04.517 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the nice,size is 1
20:24:04.517 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked potato,queue size is 3
20:24:04.517 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the nice,size is 2
20:24:04.517 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked meat,queue size is 3
20:24:04.517 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the nice,size is 2
20:24:04.517 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food is cooked tomato,queue size is 3
20:24:04.517 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the potato,size is 2
20:24:04.517 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the meat,size is 1
20:24:04.517 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - cook over!
20:24:04.517 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - delicious food,i like the tomato,size is 0
20:24:04.517 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - eat over!
20:24:04.517 [main] DEBUG com.dz.demo.multiThread.ProduceConsumerQueueDemo - main over!
- 使用LockSupport相关park与unpark方法,park方法使当前线程需要一个令牌才能继续(该令牌可以在park之前就被有或者park后被其他线程发放),unpark方法是给对应线程发放一个令牌。
public class ProduceConsumeLockSupportDemo {
public static void main(String[] args) throws Exception{
List<String>container=new ArrayList<>(5);
Map<String,Thread> threadMap=new ConcurrentHashMap<>();
ExecutorService executorService= Executors.newFixedThreadPool(2);
executorService.execute(new LockSupportProducer(threadMap,container));
executorService.execute(new LockSupportConsumer(threadMap,container));
executorService.shutdown();
}
}
@Slf4j
class LockSupportProducer implements Runnable{
private Map<String,Thread> threadMap;
private List<String>container;
public LockSupportProducer(Map<String,Thread> threadMap,List<String>container){
this.threadMap=threadMap;
this.container=container;
}
private int money=0;
private String[] dishes={"rice","beef","chicken","tomato","potato"};
private Random random=new Random();
@Override
public void run() {
threadMap.put("producer",Thread.currentThread());
while(money<15){
if(container.size()<5){
String food=dishes[random.nextInt(5)];
log.debug("cooking ,this time food is {},time is {}",food,money);
container.add(food);
money++;
}else {
LockSupport.unpark(threadMap.get("consumer"));
LockSupport.park();
}
}
LockSupport.unpark(threadMap.get("consumer"));
log.debug("cooking over,time is {}",money);
}
}
@Slf4j
class LockSupportConsumer implements Runnable{
private Map<String,Thread> threadMap;
private List<String>container;
private int money=0;
public LockSupportConsumer(Map<String,Thread> threadMap,List<String>container){
this.threadMap=threadMap;
this.container=container;
}
@Override
public void run() {
threadMap.put("consumer",Thread.currentThread());
while(money<15){
if(container.size()>0){
Iterator<String> it=container.iterator();
while (it.hasNext()){
String food=it.next();
it.remove();
log.debug("eating ,delicious food is {},time is {}",food,money);
money++;
}
}else {
LockSupport.unpark(threadMap.get("producer"));
LockSupport.park();
}
}
log.debug("eating over,time is {}",money);
}
}
打印结果如下:
20:30:03.768 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is chicken,time is 0
20:30:03.772 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is rice,time is 1
20:30:03.772 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is beef,time is 2
20:30:03.772 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is beef,time is 3
20:30:03.772 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is potato,time is 4
20:30:03.772 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is chicken,time is 0
20:30:03.772 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is rice,time is 1
20:30:03.772 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is beef,time is 2
20:30:03.772 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is beef,time is 3
20:30:03.772 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is potato,time is 4
20:30:03.772 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is potato,time is 5
20:30:03.772 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is chicken,time is 6
20:30:03.772 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is potato,time is 7
20:30:03.772 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is potato,time is 8
20:30:03.772 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is chicken,time is 9
20:30:03.772 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is potato,time is 5
20:30:03.772 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is chicken,time is 6
20:30:03.772 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is potato,time is 7
20:30:03.772 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is potato,time is 8
20:30:03.773 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is chicken,time is 9
20:30:03.773 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is rice,time is 10
20:30:03.773 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is beef,time is 11
20:30:03.773 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is chicken,time is 12
20:30:03.773 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is beef,time is 13
20:30:03.773 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking ,this time food is potato,time is 14
20:30:03.773 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.LockSupportProducer - cooking over,time is 15
20:30:03.773 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is rice,time is 10
20:30:03.773 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is beef,time is 11
20:30:03.773 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is chicken,time is 12
20:30:03.773 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is beef,time is 13
20:30:03.773 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating ,delicious food is potato,time is 14
20:30:03.773 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.LockSupportConsumer - eating over,time is 15
- 在分布式环境下的生产者消费者模型问题需要借助消息队列mq或者redis的list类型实现。这个后续文章讲到mq与redis会分享相关实践
本篇讲到了常见的三种多线程协作的模型,主要讲述了解决生产者消费者模型的三种常用办法,多线程协作的其他两个模型和相关api的使用将在下一篇中进行讲解