本文主要说明Redis的两种订阅模式的实现。
建议阅读:
1、Redis订阅与发布理论说明见:Redis之发布与订阅
I、上帝视角
1.1 两种模式
Redis具有两种订阅模式,分别为频道(channel)订阅,与模式(pattern)订阅,这部分内容可以参见建议阅读部分。
1.2 数据结构
1、struct redisServer
中维护了所有频道和订阅频道的客户端:
/*src/redis.h/redisServer*/
struct redisServer {
......
/* Pubsub */
/*用字典维护频道,key为channel,value为订阅这个频道的客户端链表*/
dict *pubsub_channels; /* Map channels to list of subscribed clients */
/*用一个链表结构维护模式订阅,每个链表节点(pubsubPattern)包含两部分内容(客户端+模式)*/
list *pubsub_patterns; /* A list of pubsub_patterns */
......
};
/*订阅模式的数据结构*/
/*src/redis.h/pubsubPattern*/
typedef struct pubsubPattern {
redisClient *client; //订阅模式客户端
robj *pattern; //被订阅的模式
} pubsubPattern;
如下图所示:
2、struct redisClient
维护了客户端自己所订阅的频道
/*src/redis.h/redisClient*/
typedef struct redisClient {
......
// 维护客户端订阅的频道,key为channel,value为null,以dict维护能在O(1)时间内确定是否订阅了某个频道
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
// 记录所有订阅模式的客户端信息
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
......
} redisClient;
3、订阅频道为一个dict
,每个channel发布信息是,直接遍历channel对应的client list,即可将所有信息发布到对应的clients;
订阅模式为一个list
,当有消息发布时,查找list,发布到对应的clients;
II、订阅
1、很容易想到,订阅过程即是对上述两个数据结构的维护,首先来看订阅频道的做法:
/*设置客户端c订阅频道channel*/
/* src/pubsub.c/pubsubSubscribeChannel*/
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
// 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel); //增加引用计数
// 关联示意图
// {
// 频道名 订阅频道的客户端
// 'channel-a' : [c1, c2, c3],
// 'channel-b' : [c5, c2, c1],
// 'channel-c' : [c10, c2, c1]
// }
/* Add the client to the channel -> list of clients hash table */
// 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
// 如果 channel 不存在于字典,那么添加进去
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
// before:
// 'channel' : [c1, c2]
// after:
// 'channel' : [c1, c2, c3]
// 将客户端添加到链表的末尾
listAddNodeTail(clients,c);
}
/* Notify the client */
// 回复客户端。
// 示例:
// redis 127.0.0.1:6379> SUBSCRIBE xxx
// Reading messages... (press Ctrl-C to quit)
// 1) "subscribe"
// 2) "xxx"
// 3) (integer) 1
addReply(c,shared.mbulkhdr[3]);
// "subscribe\n" 字符串
addReply(c,shared.subscribebulk);
// 被订阅的客户端
addReplyBulk(c,channel);
// 客户端订阅的频道和模式总数
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
}
2、订阅模式实现:
/*设置客户端c订阅模式pattern*/
/*src/pubsub.c/pubsubSubscribeChannel*/
int pubsubSubscribePattern(redisClient *c, robj *pattern) {
int retval = 0;
// 在链表中查找模式,看客户端是否已经订阅了这个模式
// 这里为什么不像 channel 那样,用字典来进行检测呢?
// 虽然 pattern 的数量一般来说并不多
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
// 如果没有的话,执行以下代码
retval = 1;
pubsubPattern *pat;
// 将 pattern 添加到 c->pubsub_patterns 链表中
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);
// 创建并设置新的 pubsubPattern 结构
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
// 添加到末尾
listAddNodeTail(server.pubsub_patterns,pat);
}
/* Notify the client */
// 回复客户端。
// 示例:
// redis 127.0.0.1:6379> PSUBSCRIBE xxx*
// Reading messages... (press Ctrl-C to quit)
// 1) "psubscribe"
// 2) "xxx*"
// 3) (integer) 1
addReply(c,shared.mbulkhdr[3]);
// 回复 "psubscribe" 字符串
addReply(c,shared.psubscribebulk);
// 回复被订阅的模式
addReplyBulk(c,pattern);
// 回复客户端订阅的频道和模式的总数
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
}
3、频道与模式的退订与订阅相反。
III、消息发布
1、发布消息的过程即为遍历两个数据结构,然后将消息发布到匹配的客户端:
/*发布消息,包括发布到订阅channel,与发布到订阅pattern*/
/*src/pubsub/pubsubPublishMessage*/
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
// 取出包含所有订阅频道 channel 的客户端的链表
// 并将消息发送给它们
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
// 遍历客户端链表,将 message 发送给它们
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
// 回复客户端。
// 示例:
// 1) "message"
// 2) "xxx"
// 3) "hello"
addReply(c,shared.mbulkhdr[3]);
// "message" 字符串
addReply(c,shared.messagebulk);
// 消息的来源频道
addReplyBulk(c,channel);
// 消息内容
addReplyBulk(c,message);
// 接收客户端计数
receivers++;
}
}
/* Send to clients listening to matching channels */
// 将消息也发送给那些和频道匹配的模式
if (listLength(server.pubsub_patterns)) {
// 遍历模式链表
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
// 取出 pubsubPattern
pubsubPattern *pat = ln->value;
// 如果 channel 和 pattern 匹配
// 就给所有订阅该 pattern 的客户端发送消息
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
// 回复客户端
// 示例:
// 1) "pmessage"
// 2) "*"
// 3) "xxx"
// 4) "hello"
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
// 对接收消息的客户端进行计数
receivers++;
}
}
decrRefCount(channel);
}
// 返回计数
return receivers;
}
IV、processCommand
在之前讲过的processCommand
函数中,也有对订阅的处理,即**如果客户端处于订阅与发布中,则,只能执行订阅与发布的相关命令:
/*src/redis.c/processCommand*/
int processCommand(redisClient *c) {
......
// 在订阅发布模式下,只允许处理SUBSCRIBE 或者UNSUBSCRIBE 命令
// 从下面的检测条件可以看出:只要存在redisClient.pubsub_channels 或者
// redisClient.pubsub_patterns,就代表处于订阅发布模式下
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
&&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed
"in this context");
return REDIS_OK;
}
......
}
【参考】
[1] 《Redis设计与实现》
[2] 《Redis源码日志》