本文主要说明Redis事务功能的实现。
建议阅读:
1、Redis事务的理论介绍见:Redis之事务实现
I、上帝视角
1、Redis通过一组命令来实现事务:
· MULTI
,开启一个事务;
· EXEC
,执行事务;
· DISCARD
,取消事务;
· WATCH
,监视某一个键值对,如果被修改则事务会被取消,这是一个乐观锁。
II、事务命令队列
1、Redis服务器收到来自客户端的MULTI
命令后,为客户端维护一个命令队列结构体,直到收到EXEC
后,开始依次执行命令队列中的命令。
// 命令队列结构体
/*src/redis.h/multiState*/
typedef struct multiState {
// 命令队列,其实是链表
multiCmd *commands; /* Array of MULTI commands */
// 命令的个数
int count; /* Total number of MULTI commands */
// 以下两个参数暂时没有用到,和主从复制有关
int minreplicas; /* MINREPLICAS for synchronous replication */
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;
每个命令节点为multiCmd
,其结构为:
/*保存事务命令*/
/*src/redis.h/multiCmd*/
/* Client MULTI/EXEC state */
typedef struct multiCmd {
// 命令参数
robj **argv;
// 参数个数
int argc;
// 命令结构体,包含了与命令相关的参数,譬如命令执行函数
// 如需更详细了解,参看redis.c 中的redisCommandTable 全局参数
struct redisCommand *cmd;
} multiCmd;
2、processCommand
函数中可以看到关于入队的操作:
int processCommand(redisClient *c) {
......
// 加入命令队列的情况
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 命令入队
queueMultiCommand(c);
addReply(c,shared.queued);
// 真正执行命令。
// 注意,如果是设置了多命令模式,那么不是直接执行命令,而是让命令入队
} else {
call(c,REDIS_CALL_FULL);
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
/* 将一个新命令添加到事务队列中*/
/*src/multi.c/queueMultiCommand*/
void queueMultiCommand(redisClient *c) {
multiCmd *mc;
int j;
// 为新数组元素分配空间
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
// 指向新元素
mc = c->mstate.commands+c->mstate.count;
// 设置事务的命令、命令参数数量,以及命令的参数
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc);
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
// 事务命令数量计数器增一
c->mstate.count++;
}
III、事务的执行与取消
1、当用户发出exec
命令时,在MULTI
之后添加的命令都会被执行,但是需要注意几点:
· WATCH
监视的键值对是否被修改,如果被修改,则会被标记为REDIS_DIRTY_CAS
,调用discardTransaction
取消事务;
· 是否入队错误,客户端将会标记为REDIS_DIRTY_EXEC
,也导致事务被取消;
/*执行事务命令*/
/*src/multi.c/execCommand*/
void execCommand(redisClient *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
// 客户端没有执行事务
if (!(c->flags & REDIS_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
/* Check if we need to abort the EXEC because:
*
* 检查是否需要阻止事务执行,因为:
*
* 1) Some WATCHed key was touched.
* 有被监视的键已经被修改了
*
* 2) There was a previous error while queueing commands.
* 命令在入队时发生错误
* (注意这个行为是 2.6.4 以后才修改的,之前是静默处理入队出错命令)
*
* A failed EXEC in the first case returns a multi bulk nil object
* (technically it is not an error but a special behavior), while
* in the second an EXECABORT error is returned.
*
* 第一种情况返回多个批量回复的空对象
* 而第二种情况则返回一个 EXECABORT 错误
*/
if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
// 取消事务
discardTransaction(c);
goto handle_monitor;
}
/* Exec all the queued commands */
// 已经可以保证安全性了,取消客户端对所有键的监视
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
// 因为事务中的命令在执行时可能会修改命令和命令的参数
// 所以为了正确地传播命令,需要现备份这些命令和参数
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count);
// 执行事务中的命令
for (j = 0; j < c->mstate.count; j++) {
// 因为 Redis 的命令必须在客户端的上下文中执行
// 所以要将事务队列中的命令、命令参数等设置给客户端
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
/* Propagate a MULTI request once we encounter the first write op.
*
* 当遇上第一个写命令时,传播 MULTI 命令。
*
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
* and atomicity guarantees.
*
* 这可以确保服务器和 AOF 文件以及附属节点的数据一致性。
*/
if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
// 传播 MULTI 命令
execCommandPropagateMulti(c);
// 计数器,只发送一次
must_propagate = 1;
}
// 执行命令
call(c,REDIS_CALL_FULL);
/* Commands may alter argc/argv, restore mstate. */
// 因为执行后命令、命令参数可能会被改变
// 比如 SPOP 会被改写为 SREM
// 所以这里需要更新事务队列中的命令和参数
// 确保附属节点和 AOF 的数据一致性
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
// 还原命令、命令参数
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
// 清理事务状态
discardTransaction(c);
/* Make sure the EXEC command will be propagated as well if MULTI
* was already propagated. */
// 将服务器设为脏,确保 EXEC 命令也会被传播
if (must_propagate) server.dirty++;
handle_monitor:
/* Send EXEC to clients waiting data from MONITOR. We do it here
* since the natural order of commands execution is actually:
* MUTLI, EXEC, ... commands inside transaction ...
* Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
* table, and we do it here with correct ordering. */
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
2、取消事务
函数discardTransaction
为取消事务:
/*取消事务*/
/*src/multi.c/disacrdTransaction*/
void discardTransaction(redisClient *c) {
// 清空命令队列
freeClientMultiState(c);
// 初始化命令队列
initClientMultiState(c);
// 取消标记flag
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);;
unwatchAllKeys(c);
}
IV、WATCH
WATCH
是一个乐观锁,让Redis拥有了check-and-set(CAS)
特性。
4.1 redisClient与redisDb中的数据结构
redisClient
与redisDb
结构体中维护了WATCH
相关的数据结构:
1、每个redisClient
都维护一个链表,记录自己所被监视的key:
/*src/redis.h/redisClient*/
typedef struct redisClient {
......
//正在被监视的键
list *watched_keys;
......
} redisClient;
2、每个redisDb
都维护了一个watched_keys
的字典,key为被监视的数据库键,value为一个链表,记录所有监视相应数据库键的客户端:
/*src/redis.h/redisDb*/
typedef struct redisDb {
......
//正在被监视的键
dict *watched_keys;
......
} redisDb;
4.2 WATCH
实现
/*watch命令*/
/*src/multi.c/watchCommand*/
void watchCommand(redisClient *c) {
int j;
// 不能在事务开始后执行
if (c->flags & REDIS_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
// 监视输入的任意个键
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}
其中主要调用了watchForKey
函数,为真正的监视键值函数:
/*监视特定键值,即为维护两个用于watch的数据结构*/
/*src/multi/watchForKey*/
void watchForKey(redisClient *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;
/* Check if we are already watching for this key */
// 检查 key 是否已经保存在 watched_keys 链表中,
// 如果是的话,直接返回
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
return; /* Key already watched */
}
// 键不存在于 watched_keys ,添加它
// 以下是一个 key 不存在于字典的例子:
// before :
// {
// 'key-1' : [c1, c2, c3],
// 'key-2' : [c1, c2],
// }
// after c-10086 WATCH key-1 and key-3:
// {
// 'key-1' : [c1, c2, c3, c-10086],
// 'key-2' : [c1, c2],
// 'key-3' : [c-10086]
// }
/* This key is not already watched in this DB. Let's add it */
// 检查 key 是否存在于数据库的 watched_keys 字典中
clients = dictFetchValue(c->db->watched_keys,key);
// 如果不存在的话,添加它
if (!clients) {
// 值为链表
clients = listCreate();
// 关联键值对到字典
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
// 将客户端添加到链表的末尾
listAddNodeTail(clients,c);
/* Add the new key to the list of keys watched by this client */
// 将新 watchedKey 结构添加到客户端 watched_keys 链表的表尾
// 以下是一个添加 watchedKey 结构的例子
// before:
// [
// {
// 'key': 'key-1',
// 'db' : 0
// }
// ]
// after client watch key-123321 in db 0:
// [
// {
// 'key': 'key-1',
// 'db' : 0
// }
// ,
// {
// 'key': 'key-123321',
// 'db': 0
// }
// ]
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
}
4.3 标记某个key是否被修改
/*src/multi.c/touchWatchedKey*/
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail.
*
* “触碰”一个键,如果这个键正在被某个/某些客户端监视着,
* 那么这个/这些客户端在执行 EXEC 时事务将失败。
*/
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
// 字典为空,没有任何键被监视
if (dictSize(db->watched_keys) == 0) return;
// 获取所有监视这个键的客户端
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as REDIS_DIRTY_CAS */
/* Check if we are already watching for this key */
// 遍历所有客户端,打开他们的 REDIS_DIRTY_CAS 标识
listRewind(clients,&li);
while((ln = listNext(&li))) {
redisClient *c = listNodeValue(ln);
c->flags |= REDIS_DIRTY_CAS;
}
}
【参考】
[1] 《Redis设计与实现》
[2] 《Redis源码日志》