使用BlockingQueue模拟生产者与消费者
class Producer extends Thread{
private BlockingQueue queue;
private volatile boolean flag=true;
private static AtomicInteger count=new AtomicInteger();
public Producer(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
System.out.println(getName()+"生产者线程启动...");
try {
while (flag){
System.out.println(getName()+"生产者开始生产消息...");
//如果flag为true,queue就入队列。(原子类进行计数)
Integer i = count.incrementAndGet();
boolean offer = queue.offer(i);
if(offer){
System.out.println(getName()+"生产者生产生产消息:"+i+"成功");
}else {
System.out.println(getName()+"生产者生产生产消息:"+i+"失败");
}
Thread.sleep(1000);
}
}catch (Exception e){
}finally {
System.out.println(getName()+"生产者线程停止...");
}
}
public void stopThread(){
this.flag=false;
}
}
class Consumer extends Thread{
private BlockingQueue queue;
private volatile boolean flag=true;
public Consumer(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
System.out.println(getName()+"消费者线程启动...");
try {
while (flag){
System.out.println(getName()+"消费者开始消费消息...");
//如果flag为true,queue就出队列
Integer poll = (Integer) queue.poll(2, TimeUnit.SECONDS);
if(poll != null){
System.out.println(getName()+"消费者获取消息:"+poll+"成功");
}else {
System.out.println(getName()+"消费者获取消息:"+poll+"失败");
this.flag=false;
}
}
}catch (Exception e){
}finally {
System.out.println(getName()+"消费者线程停止...");
}
}
}
public class ProduceConsumerThread {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
Producer p1 =new Producer(queue);
Producer p2 =new Producer(queue);
Consumer c1 =new Consumer(queue);
p1.start();
p2.start();
c1.start();
Thread.sleep(3*1000);
p1.stopThread();
p2.stopThread();
}
}