异步队列简单介绍
队列实现异步可以用单向队列,任务放到队列中,先进先出,或者使用优先队列,按照优先级来选择谁先执行,来防止某一个用户执行大量的请求,如一个用户发送了100个请求,如果用单向队列,其它用户必须要等这个用户的100个请求结束后才能执行,这就不合理,所以可以给第2个请求设置比较低的优先级,这样其他用户的请求也可以被执行
使用异步队列可以让主线程继续运行,减少请求响应时间和解耦,主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。
主要编写两个类
- EventProducer把事件推到Redis队列中去
- EventConsumer把事件从队列中取出来,找到关联的handler,一件一件地去处理
其他用到了一些辅助类
- 枚举型的EventType,用来表示不同的Event类型
- EventModel来存放Event中的相关信息
- 接口类EventHandler,其中有两个函数,一个是执行Handler,另一个是返回需要使用这个Handler的EventType列表
具体实现
1. 枚举型EventType
public enum EventType {
LIKE(0),
COMMENT(1),
LOGIN(2),
ASK(3),
Mail(4);
private int value;
EventType(int value)
{
this.value=value;
}
public int getValue()
{
return this.value;
}
}
2.EventModel
public class EventModel {
private int userid;//事件执行者,如点赞的人
private EventType eventype;//事件类型,如点赞
private int entitytype;//操作对象类型,如给评论点赞,则为评论类型
private int entityid;//操作对象ID,如评论的ID
private int entityownerid;//对象的拥有者,如发表该评论的用户
public int getUserid() {
return userid;
}
public EventModel setUserid(int userid) {
this.userid = userid;
return this;
}
public EventType getEventype() {
return eventype;
}
public EventModel setEventype(EventType eventype) {
this.eventype = eventype;
return this;
}
public int getEntitytype() {
return entitytype;
}
public EventModel setEntitytype(int entitytype) {
this.entitytype = entitytype;
return this;
}
public int getEntityid() {
return entityid;
}
public EventModel setEntityid(int entityid) {
this.entityid = entityid;
return this;
}
public int getEntityownerid() {
return entityownerid;
}
public EventModel setEntityownerid(int entityownerid) {
this.entityownerid = entityownerid;
return this;
}
public Map<String, String> getMap() {
return map;
}
public EventModel setMap(Map<String, String> map) {
this.map = map;
return this;
}
private Map<String,String> map=new HashMap<String, String>();
public EventModel setkeyvalue(String key,String value)
{
map.put(key,value);
return this;
}
public String getkeyvalue(String key)
{
return map.get(key);
}
}
3. EventHandler
public interface EventHandler {
void doHandle(EventModel event);//执行Event的具体操作函数
List<EventType> getSupportEventType();//返回该handler对那些类型的Event是关心的,即这些EventType进入队列需要运行时,需要调用该Handler
}
4. EventProducer
@Service
public class EventProducer {
@Autowired
JedisService jedis;
//将EVENT加入到redis队列中
public boolean fireEvent(EventModel event)
{
try
{
String eventstring= JSONObject.toJSONString(event);
//将event转换为JSON字符串,取出时Parse回到EventModel类型
String key= RedisKeyUtil.eventKey;
jedis.lpush(key,eventstring);
//将该event加入list中
return true;
}
catch(Exception e)
{
return false;
}
}
}
5.EventConsumer
@Service
public class EventConsumer implements InitializingBean,ApplicationContextAware{
@Autowired
JedisService jedis;
private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class) ;
private Map<EventType,List<EventHandler>> eventConsumerMap=new HashMap<EventType,List<EventHandler>>();
//一种类型的EventType进来,就寻找这件Event所需要的Handler的列表
private ApplicationContext applicationContext;
//运行的上下文
@Override
public void afterPropertiesSet() throws Exception {
Map<String,EventHandler> beans=applicationContext.getBeansOfType(EventHandler.class);
//找到所有的EventHandler,注意在具体实现handler的时候一定要加上@Component,否则找不到这个handler
if(beans!=null)
{
for(Map.Entry<String,EventHandler> entry:beans.entrySet())
{
List<EventType> tmp=entry.getValue().getSupportEventType();
for(EventType a:tmp)
{
if(eventConsumerMap.containsKey(a)==false)
{
eventConsumerMap.put(a,new ArrayList<EventHandler>());
}
eventConsumerMap.get(a).add(entry.getValue());
}
}
}
//初始化Map<EventType,List<EventHandler>>将Event类型和需要用到的EventHandler关联起来
//新建线程,该线程不断地循环,从redis的list中找event,找到了就将Json字符串parse成event,然后执行它所需要的handler
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
while(true)
{
String key= RedisKeyUtil.eventKey;
List<String> eventlist=jedis.brpop(0,key);
for(String tmp:eventlist)
{
if(tmp.equals(key))
continue;
EventModel model= JSON.parseObject(tmp,EventModel.class);
if(!eventConsumerMap.containsKey(model.getEventype()))
{
logger.error("不能识别的事件");
continue;
}
for(EventHandler handler:eventConsumerMap.get(model.getEventype()))
{
handler.doHandle(model);
}
}
}
}
});
thread.start();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
}
}
解释一下其中的brpop指令:
假如在指定时间内没有任何元素被弹出,则返回一个 nil 和等待时长(即第一个参数,表示等待time时间后返回nil)。 反之,返回一个含有两个元素的列表,第一个元素是被弹出元素所属的 key (所以下面遍历的时候要把Key过滤掉),第二个元素是被弹出元素的值。
6.具体的Handler实现 LikeHandler
@Component
public class LikeHandler implements EventHandler {
@Autowired
UserService userService;
@Autowired
MessageService messageService;
//给被点赞用户发送站内信
@Override
public void doHandle(EventModel event) {
Message msg=new Message();
msg.setToid(event.getEntityownerid());
msg.setCreateddate(new Date());
msg.setFromid(888);
User user=userService.getuserbyid(event.getUserid());
msg.setContent("用户"+user.getName()+"赞了您的评论 "+event.getkeyvalue("questionid"));
msg.setHasread(0);
msg.setConversationid(event.getEntityownerid(),888);
messageService.insertMessage(msg);
}
@Override
public List<EventType> getSupportEventType() {
return Arrays.asList(EventType.LIKE);
}
}
7. Controller中将任务放入异步队列中
eventProducer.fireEvent(new EventModel().setEntityid(entity_id).setEntitytype(EntityType.ENTITY_COMMENT).setUserid(user.getId()).setEventype(EventType.LIKE).setkeyvalue("questionid","http://127.0.0.1:8080/question/"+String.valueOf(comment.getEntityid())).setEntityownerid(comment.getUserid()));
总结:
通过异步队列,如果用户有一个耗时且不需要同步响应的事件时,可以将事件通过事件生产者,将事件推入异步队列中,消费者线程不断从异步队列中取出事件,来执行,适用于Message Service等应用场景