一、什么是阻塞队列
1.1 简介
阻塞队列(BlockingQueue)是用于进程间通信或同一进程内的线程间通信的组件。它的工作原理是当队列是空的时,线程试图从队列中获取元素的操作将会被阻塞,或者当队列是满时,线程往队列里添加元素的操作会被阻塞。阻塞队列最常用于生产消费模式中,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。
1.2 常用操作方法
最近在看方腾飞的《Java并发编程艺术》书籍,书中作者对阻塞队列常用的四种处理方法归纳的非常清晰。四种处理方法如下:
处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
- 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
- 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
- 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
1.3 Java里的阻塞队列
JDK7提供了7个阻塞队列。分别是
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
1.4 为什么用队列?
如上图它是一种线性列表,以FIFO(先进先出)的顺序访问。由于队列是线性列表,在查找元素时需要一个一个遍历,效率非常低,所以不适合做查询操作,那么它的优点在哪?优点在于他们每一个节点是串联的,在移动或删除元素时,只需要修改节点指向的元素,而在数组中当你删除其中一个元素时,需要把该元素后边所有元素位置重新排列,这就使得队列在插入和删除上效率要比数组中高,详细如下图所示。那么在游戏服务器开发中它的主要应用场景有哪一些?比如:频道聊天,AOI、邮件群发等等。
频道聊天
玩家A在频道发送消息-----> 服务器------->该频道玩家看到了这个玩家A发的消息
AOI
玩家A在场景B对NPC使用技能-----> 服务器------->该场景B附近的玩家能看到玩家A在使用技能
使用消息队列的十大理由:http://www.oschina.net/translate/top-10-uses-for-message-queue?cmp
1.5 代码示例(频道聊天)
package com.game.lll.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueTest{
public static final int WORLD = 0;//世界频道
public static final int AREA = 1;//区域频道
public static final int SYSTEM = 2;//系统消息
public static final String[] NAME = {"世界","区域","系统"};
protected BlockingQueue<ChatMessage> messages = new LinkedBlockingQueue<ChatMessage>();
private Producer producer1 = new Producer("小毛驴");
private Producer producer2 = new Producer("小兔子");
private Producer producer3 = new Producer("小猫咪");
private Consumer consumer = new Consumer();
public static void main(String[] args) {
LinkedBlockingQueueTest queueTest = new LinkedBlockingQueueTest();
queueTest.consumer.start();;
for(int i=0;i<10;i++){
new Thread(){
public void run() {
queueTest.producer1.addChatMessage("大家好!", WORLD);
queueTest.producer2.addChatMessage("大家好!", AREA);
queueTest.producer3.addChatMessage("大家好!", SYSTEM);
};
}.start();
}
while(Thread.activeCount()>1) //保证前面的线程都执行完
Thread.yield();
}
public class Producer
{
protected String name;
public Producer(String name) {
this.name = name;
}
public void addChatMessage(String message,int channel) {
messages.add(new ChatMessage(name, message, channel));
}
}
public class Consumer extends Thread{
@Override
public void run() {
while (true) {
try {
ChatMessage message = messages.take();
switch (message.channel) {
case WORLD:
//
break;
case AREA:
//
break;
case SYSTEM:
//
break;
default:
break;
}
System.out.println("【"+NAME[message.channel]+"】"+message.name+":"+message.message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class ChatMessage
{
protected String name;
protected String message;
protected int channel;
public ChatMessage(String name,String message,int channel)
{
this.name = name;
this.message = message;
this.channel = channel;
}
}
}
控制台:
【世界】小毛驴:大家好!
【世界】小毛驴:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
【世界】小毛驴:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
【世界】小毛驴:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
【世界】小毛驴:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
【世界】小毛驴:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
【世界】小毛驴:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
【世界】小毛驴:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
【世界】小毛驴:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
【世界】小毛驴:大家好!
【区域】小兔子:大家好!
【系统】小猫咪:大家好!
作者:小毛驴,一个Java游戏服务器开发者 原文地址:https://liulongling.github.io/