Redis源码研究之事务

本文主要说明Redis事务功能的实现。

建议阅读:
1、Redis事务的理论介绍见:Redis之事务实现

I、上帝视角

1、Redis通过一组命令来实现事务:
· MULTI,开启一个事务;
· EXEC,执行事务;
· DISCARD,取消事务;
· WATCH,监视某一个键值对,如果被修改则事务会被取消,这是一个乐观锁

II、事务命令队列

1、Redis服务器收到来自客户端的MULTI命令后,为客户端维护一个命令队列结构体,直到收到EXEC后,开始依次执行命令队列中的命令。

// 命令队列结构体
/*src/redis.h/multiState*/
typedef struct multiState {
    // 命令队列,其实是链表
    multiCmd *commands; /* Array of MULTI commands */

    // 命令的个数
    int count; /* Total number of MULTI commands */

    // 以下两个参数暂时没有用到,和主从复制有关
    int minreplicas; /* MINREPLICAS for synchronous replication */
    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;  

每个命令节点为multiCmd,其结构为:

/*保存事务命令*/
/*src/redis.h/multiCmd*/
/* Client MULTI/EXEC state */
typedef struct multiCmd {
    // 命令参数
    robj **argv;
    // 参数个数
    int argc;
    // 命令结构体,包含了与命令相关的参数,譬如命令执行函数
    // 如需更详细了解,参看redis.c 中的redisCommandTable 全局参数
    struct redisCommand *cmd;
} multiCmd;  

2、processCommand函数中可以看到关于入队的操作:

int processCommand(redisClient *c) {
    ......
    // 加入命令队列的情况
    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
    // 命令入队
    queueMultiCommand(c);
    addReply(c,shared.queued);
   
 // 真正执行命令。
    // 注意,如果是设置了多命令模式,那么不是直接执行命令,而是让命令入队
    } else {
        call(c,REDIS_CALL_FULL);
    if (listLength(server.ready_keys))
        handleClientsBlockedOnLists();
    }
    return REDIS_OK;
}  
 /* 将一个新命令添加到事务队列中*/  
/*src/multi.c/queueMultiCommand*/
void queueMultiCommand(redisClient *c) {
    multiCmd *mc;
    int j;

    // 为新数组元素分配空间
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));

    // 指向新元素
    mc = c->mstate.commands+c->mstate.count;

    // 设置事务的命令、命令参数数量,以及命令的参数
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);

    // 事务命令数量计数器增一
    c->mstate.count++;
}

III、事务的执行与取消

1、当用户发出exec命令时,在MULTI之后添加的命令都会被执行,但是需要注意几点:
· WATCH监视的键值对是否被修改,如果被修改,则会被标记为REDIS_DIRTY_CAS,调用discardTransaction取消事务;
· 是否入队错误,客户端将会标记为REDIS_DIRTY_EXEC,也导致事务被取消;

/*执行事务命令*/  
/*src/multi.c/execCommand*/ 
void execCommand(redisClient *c) {
   int j;
   robj **orig_argv;
   int orig_argc;
   struct redisCommand *orig_cmd;
   int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */

   // 客户端没有执行事务
   if (!(c->flags & REDIS_MULTI)) {
       addReplyError(c,"EXEC without MULTI");
       return;
   }

   /* Check if we need to abort the EXEC because:
    *
    * 检查是否需要阻止事务执行,因为:
    *
    * 1) Some WATCHed key was touched.
    *    有被监视的键已经被修改了
    *
    * 2) There was a previous error while queueing commands.
    *    命令在入队时发生错误
    *    (注意这个行为是 2.6.4 以后才修改的,之前是静默处理入队出错命令)
    *
    * A failed EXEC in the first case returns a multi bulk nil object
    * (technically it is not an error but a special behavior), while
    * in the second an EXECABORT error is returned. 
    *
    * 第一种情况返回多个批量回复的空对象
    * 而第二种情况则返回一个 EXECABORT 错误
    */
   if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {

       addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
                                                 shared.nullmultibulk);

       // 取消事务
       discardTransaction(c);

       goto handle_monitor;
   }

   /* Exec all the queued commands */
   // 已经可以保证安全性了,取消客户端对所有键的监视
   unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */

   // 因为事务中的命令在执行时可能会修改命令和命令的参数
   // 所以为了正确地传播命令,需要现备份这些命令和参数
   orig_argv = c->argv;
   orig_argc = c->argc;
   orig_cmd = c->cmd;

   addReplyMultiBulkLen(c,c->mstate.count);

   // 执行事务中的命令
   for (j = 0; j < c->mstate.count; j++) {

       // 因为 Redis 的命令必须在客户端的上下文中执行
       // 所以要将事务队列中的命令、命令参数等设置给客户端
       c->argc = c->mstate.commands[j].argc;
       c->argv = c->mstate.commands[j].argv;
       c->cmd = c->mstate.commands[j].cmd;

       /* Propagate a MULTI request once we encounter the first write op.
        *
        * 当遇上第一个写命令时,传播 MULTI 命令。
        *
        * This way we'll deliver the MULTI/..../EXEC block as a whole and
        * both the AOF and the replication link will have the same consistency
        * and atomicity guarantees. 
        *
        * 这可以确保服务器和 AOF 文件以及附属节点的数据一致性。
        */
       if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {

           // 传播 MULTI 命令
           execCommandPropagateMulti(c);

           // 计数器,只发送一次
           must_propagate = 1;
       }

       // 执行命令
       call(c,REDIS_CALL_FULL);

       /* Commands may alter argc/argv, restore mstate. */
       // 因为执行后命令、命令参数可能会被改变
       // 比如 SPOP 会被改写为 SREM
       // 所以这里需要更新事务队列中的命令和参数
       // 确保附属节点和 AOF 的数据一致性
       c->mstate.commands[j].argc = c->argc;
       c->mstate.commands[j].argv = c->argv;
       c->mstate.commands[j].cmd = c->cmd;
   }

   // 还原命令、命令参数
   c->argv = orig_argv;
   c->argc = orig_argc;
   c->cmd = orig_cmd;

   // 清理事务状态
   discardTransaction(c);

   /* Make sure the EXEC command will be propagated as well if MULTI
    * was already propagated. */
   // 将服务器设为脏,确保 EXEC 命令也会被传播
   if (must_propagate) server.dirty++;

handle_monitor:
   /* Send EXEC to clients waiting data from MONITOR. We do it here
    * since the natural order of commands execution is actually:
    * MUTLI, EXEC, ... commands inside transaction ...
    * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
    * table, and we do it here with correct ordering. */
   if (listLength(server.monitors) && !server.loading)
       replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}  

2、取消事务

函数discardTransaction为取消事务:

/*取消事务*/
/*src/multi.c/disacrdTransaction*/
void discardTransaction(redisClient *c) {
    // 清空命令队列
    freeClientMultiState(c);
    // 初始化命令队列
    initClientMultiState(c);
    // 取消标记flag
    c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);;
    unwatchAllKeys(c);
}  

IV、WATCH

WATCH是一个乐观锁,让Redis拥有了check-and-set(CAS)特性。

4.1 redisClient与redisDb中的数据结构

redisClientredisDb结构体中维护了WATCH相关的数据结构:

1、每个redisClient都维护一个链表,记录自己所被监视的key:

/*src/redis.h/redisClient*/
typedef struct redisClient {
    ......
    //正在被监视的键
    list *watched_keys;
    ......
} redisClient;    

2、每个redisDb都维护了一个watched_keys的字典,key为被监视的数据库键,value为一个链表,记录所有监视相应数据库键的客户端:

/*src/redis.h/redisDb*/
typedef struct redisDb {
    ......
    //正在被监视的键
    dict *watched_keys;
    ......
} redisDb;    

4.2 WATCH实现

/*watch命令*/
/*src/multi.c/watchCommand*/
void watchCommand(redisClient *c) {
    int j;

    // 不能在事务开始后执行
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }

    // 监视输入的任意个键
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);

    addReply(c,shared.ok);
}

其中主要调用了watchForKey函数,为真正的监视键值函数:

/*监视特定键值,即为维护两个用于watch的数据结构*/  
/*src/multi/watchForKey*/  

void watchForKey(redisClient *c, robj *key) {

    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    // 检查 key 是否已经保存在 watched_keys 链表中,
    // 如果是的话,直接返回
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }

    // 键不存在于 watched_keys ,添加它

    // 以下是一个 key 不存在于字典的例子:
    // before :
    // {
    //  'key-1' : [c1, c2, c3],
    //  'key-2' : [c1, c2],
    // }
    // after c-10086 WATCH key-1 and key-3:
    // {
    //  'key-1' : [c1, c2, c3, c-10086],
    //  'key-2' : [c1, c2],
    //  'key-3' : [c-10086]
    // }

    /* This key is not already watched in this DB. Let's add it */
    // 检查 key 是否存在于数据库的 watched_keys 字典中
    clients = dictFetchValue(c->db->watched_keys,key);
    // 如果不存在的话,添加它
    if (!clients) { 
        // 值为链表
        clients = listCreate();
        // 关联键值对到字典
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    // 将客户端添加到链表的末尾
    listAddNodeTail(clients,c);

    /* Add the new key to the list of keys watched by this client */
    // 将新 watchedKey 结构添加到客户端 watched_keys 链表的表尾
    // 以下是一个添加 watchedKey 结构的例子
    // before:
    // [
    //  {
    //   'key': 'key-1',
    //   'db' : 0
    //  }
    // ]
    // after client watch key-123321 in db 0:
    // [
    //  {
    //   'key': 'key-1',
    //   'db' : 0
    //  }
    //  ,
    //  {
    //   'key': 'key-123321',
    //   'db': 0
    //  }
    // ]
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}
4.3 标记某个key是否被修改
/*src/multi.c/touchWatchedKey*/
/* "Touch" a key, so that if this key is being WATCHed by some client the
 * next EXEC will fail. 
 *
 * “触碰”一个键,如果这个键正在被某个/某些客户端监视着,
 * 那么这个/这些客户端在执行 EXEC 时事务将失败。
 */
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    // 字典为空,没有任何键被监视
    if (dictSize(db->watched_keys) == 0) return;

    // 获取所有监视这个键的客户端
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
    /* Check if we are already watching for this key */
    // 遍历所有客户端,打开他们的 REDIS_DIRTY_CAS 标识
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        redisClient *c = listNodeValue(ln);

        c->flags |= REDIS_DIRTY_CAS;
    }
}

【参考】
[1] 《Redis设计与实现》
[2] 《Redis源码日志》

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容