逻辑比较简单,大概是用户点击商品,对应的为每个商品统计一下被点击次数,然后根据商品的次数降序排序得到一个热门查看排行榜。
逻辑挺简单,只是考虑排行榜的数据属于热点数据,并发过多时,直接到数据库可能会引起性能问题,所以这边用redis队列方式进行统计,所以数据还是会延迟更新到到数据库的,生成排行榜时依旧是走数据库(其实就是一种思路尝鲜)。
所以这边通过redis的List作为消息队列,多个进程向队列写,一个进程读取队列,并将其更新到数据库中,然后通过nodejs事件的方式进行消息的消化:
import * as events from "events";
import { RedisService } from "./redis.service";
export default class TestEvent extends events.EventEmitter {
static client: TestEvent;
private constructor() {
super();
this.setMaxListeners(0);
this.on("cache_record", this.cacheRecord);
}
async cacheRecord(result: { result: any}) {
// 将消息从右边压入队列
await RedisService.rpush("cache:key", values);
}
static getClient() {
if (!TestEvent.client) {
TestEvent.client = new TestEvent();
}
return TestEvent.client;
}
}
通过事件把消息缓存到redis:
import TestEvent from "./test.event";
@Path("/detail")
export default class TestController {
@GET("/:id")
async searchPlant(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
// 处理完业务逻辑后,通过事件将用户点击行为缓存到redis
TestEvent.getClient().emit("cache_record", "");
res.send();
} catch (e) {
console.log(e);
next(new ErrorResponse("system error"));
}
}
}
现在消息的添加已经有了,下一步进行消息缓存,通过setImmediate()
去不断的消费数据:
// 通过redis包封装的一些redis指令service
import { RedisService } from "./redis.service";
export async function persistenceRecord() {
if (INSTANCE_ID === 0) {
return setImmediate(persistenceRecord);
}
try {
// 从队列里取出消息,进行相应处理
// 这里需要注意,blpop是阻塞指令,不要和非阻塞指令共用一个连接
const record: string = await RedisService.blpop(cache:key, 1800);
} catch (e) {
console.log(e);
}
// 自注册,进行下一次消费
setImmediate(persistenceRecord);
}
由于是用pm2开启的cluster模式,所以必须通过pm2定义的cluster实例ID:INSTANCE_ID
进行指定某个进程进行消息消费(详情配置可看这里)。
至于blpop
为什么不要和非阻塞指令共用一个连接,看这里。