主要思路是用一个set做前端去重缓冲, 若干个list做后端的多优先级消息队列, 用一个进程来进行分发, 即从set中分发消息到队列.
set缓冲的设计为当天有效, 所以有个零点问题,有可能在零点前set中刚放进去的消息没有分发即失效, 这一点可以用另一个进程弥补处理前一天的遗留消息和删除前一天的缓冲
<?php
class MsgQuery {
// TODO - Insert your code here
const KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息缓冲key前缀
const KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key
const KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已处理缓冲key前缀
const SCORE_NUM = 5; // 优先级划分数目
const MIN_SCORE = 1; // 最小优先级
static $MAX_SCORE;
static $instance = null;
private $redis;
public static function getInstance($redis) {
if (null == self::$instance) {
self::$instance = new MsgQuery ( $redis );
}
return self::$instance;
}
/**
* 添加消息到消息缓冲区
* @param int $score 优先级(1-5)
* @param string $msg 消息
*/
public function add($score, $msg) {
// 添加到消息缓冲
$socre = intval ( $score );
if ($socre < self::MIN_SCORE) {
$score = self::MIN_SCORE;
}
if ($score > self::$MAX_SCORE) {
$score = self::$MAX_SCORE;
}
$cacheKey = self::KEY_CACHE_PREFIX . date ( 'Ymd' );
$cacheData = array (
'score' => $score,
'msg' => $msg
);
$this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );
}
/**
* 将消息从缓冲区移动到相应的优先级队列中
*/
public function moveToQuery() {
// 获取当前缓冲区没有入队列的消息
$dealKey = self::KEY_CACHE_DEAL_PREFIX.date('Ymd');
$cacheKey = self::KEY_CACHE_PREFIX.date('Ymd');
$msgs = $this->redis->sDiff($cacheKey, $dealKey);
foreach ($msgs as $cachedData){
// 放入已处理集合
$this->redis->sAdd ( $dealKey, $cachedData );
// 压入相应的优先级队列
$cachedData = unserialize($cachedData);
$score = $cachedData['score'];
$msg = $cachedData['msg'];
$queryKey = self::KEY_QUERY_PREFIX.$score;
$this->redis->rPush($queryKey, $msg);
}
unset($cachedData);
}
/**
* 从队列阻塞式出栈一个最高优先级消息
* @return string msg
*/
public function bPop(){
$queryKeys = array();
for($score=self::$MAX_SCORE;$score>=self::MIN_SCORE;$score--){
$queryKeys[] = self::KEY_QUERY_PREFIX.$score;
}
$msg = $this->redis->blPop($queryKeys, 0);
return $msg[1];
}
private function __construct($redis) {
$this->redis = $redis;
$this->redis->connect ();
self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;
}
private function __destruct() {
$this->redis->close ();
}
}
?>