Redis源码分析之事务Transaction

这周学习了一下redis事务功能的实现原理,本来是想用一篇文章进行总结的,写完以后发现这块内容比较多,而且多个命令之间又互相依赖,放在一篇文章里一方面篇幅会比较大,另一方面文章组织结构会比较乱,不容易阅读。因此把事务这个模块整理成上下两篇文章进行总结。

这篇文章我们重点分析一下redis事务命令中的两个辅助命令:watch跟unwatch。

一、redis事务辅助命令简介

依然从server.c文件的命令表中找到相应的命令以及它们对应的处理函数。

//watch,unwatch两个命令我们把它们叫做redis事务辅助命令
{"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0},
  1. watch,用于客户端关注某个key,当这个key的值被修改时,整个事务就会执行失败(注:该命令需要在事务开启前使用)。
  2. unwatch,用于客户端取消已经watch的key。

用法举例如下:
clientA

127.0.0.1:6379> watch a
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set b b
QUEUED
//在执行前插入clientB的操作如下,事务就会执行失败
127.0.0.1:6379> exec
(nil)
127.0.0.1:6379>

clientB

127.0.0.1:6379> set a aa
OK
127.0.0.1:6379>

二、redis事务辅助命令源码分析

在看具体执行函数之前首先了解几个数据结构:

//每个客户端对象中有一个watched_keys链表来保存已经watch的key
typedef struct client {
    list *watched_keys;  
}
//上述链表中每个节点的数据结构
typedef struct watchedKey {
    //watch的key
    robj *key;
    //指向的DB,后面细说
    redisDb *db;
} watchedKey;

关于事务的几个命令所对应的函数都放在了multi.c文件中。
一起看下watch命令对应处理函数的源码:

void watchCommand(client *c) {
    int j;
    //如果客户端处于事务状态,则返回错误信息
    //由此可以看出,watch必须在事务开启前使用
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    //依次watch客户端的各个参数(这里说明watch命令可以一次watch多个key)
    //注:0表示命令本身,所以参数从1开始
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    //返回结果
    addReply(c,shared.ok);
}

//具体的watch操作,代码较长,慢慢分析
void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    //上面已经提到了数据结构
    watchedKey *wk;

    //首先判断key是否已经被客户端watch
    //listRewind这个函数在发布订阅那篇文章里也有,就是把客户端的watched_keys赋值给li
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        //这里一个wk节点中有db,key两个字段
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; 
    }
    //开始watch指定key
    //整个watch操作保存了两套数据结构,一套是在db->watched_keys中的字典结构,如下:
    clients = dictFetchValue(c->db->watched_keys,key);
    //如果是key第一次出现,则进行初始化
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    //把当前客户端加到该key的watch链表中
    listAddNodeTail(clients,c);
    //另一套是在c->watched_keys中的链表结构:如下
    wk = zmalloc(sizeof(*wk));
    //初始化各个字段
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    //加入到链表最后
    listAddNodeTail(c->watched_keys,wk);
}

整个watch的数据结构比较复杂,我这里画了一张图方便理解:
Redis源码分析之事务Transaction
简单解释一下上面的图,首先redis把每个客户端连接包装成了一个client对象,上图中db,watch_keys就是其中的两个字段(client对象里面还有很多其他字段,包括上篇文章中提到的pub/sub)。

  1. db字段指向给该client对象分配的储存空间,db对象中也含有一个watched_keys字段,是字典类型(也就是哈希表),以想要watch的key做key,存储的链表则是所有watch该key的客户端。
  2. watch_keys字段则是一个链表类型,每个节点类型为watch_key,其中包含两个字段,key表示watch的key,db则指向了当前client对象的db字段,如上图。

看完watch命令的源码以后,再来看一下unwatch命令,如果搞明白了上面提到的两套数据结构,那么看unwatch的源码应该会比较容易,毕竟就是删除数据结构中对应的内容。

void unwatchCommand(client *c) {
    //取消watch所有key
    unwatchAllKeys(c);
    //修改客户端状态
    c->flags &= (~CLIENT_DIRTY_CAS);
    addReply(c,shared.ok);
}

//取消watch的key
void unwatchAllKeys(client *c) {
    listIter li;
    listNode *ln;
    //如果客户端没有watch任何key,则直接返回
    if (listLength(c->watched_keys) == 0) return;
    //注意这里操作的是链表字段
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        list *clients;
        watchedKey *wk;
        //遍历取出该客户端watch的key
        wk = listNodeValue(ln);
        //取出所有watch了该key的客户端,这里则是字典(即哈希表)
        clients = dictFetchValue(wk->db->watched_keys, wk->key);
        //空指针判断
        serverAssertWithInfo(c,NULL,clients != NULL);
        //从watch列表中删除该客户端
        listDelNode(clients,listSearchKey(clients,c));
        //如果key只有一个当前客户端watch,则删除
        if (listLength(clients) == 0)
            dictDelete(wk->db->watched_keys, wk->key);
        //从当前client的watch列表中删除该key
        listDelNode(c->watched_keys,ln);
        //减少引用数
        decrRefCount(wk->key);
        //释放内存
        zfree(wk);
    }
}

最后我们考虑一下watch机制的触发时机,现在我们已经把想要watch的key加入到了watch的数据结构中,可以想到触发watch的时机应该是修改key的内容时,通知到所有watch了该key的客户端。

感兴趣的用户可以任意选一个修改命令跟踪一下源码,例如set命令,我们发现所有对key进行修改的命令最后都会调用touchWatchedKey()函数,而该函数源码就位于multi.c文件中,该函数就是触发watch机制的关键函数,源码如下:

//这里入参db就是客户端对象中的db,上文已经提到,不赘述
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;
    //保存watchkey的字典为空,则返回
    if (dictSize(db->watched_keys) == 0) return;
    //注意这里操作的是字典(即哈希表)数据结构
    clients = dictFetchValue(db->watched_keys, key);
    //如果没有客户端watch该key,则返回
    if (!clients) return;
    //把client赋值给li
    listRewind(clients,&li);
    //遍历watch了该key的客户端,修改他们的状态
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags |= CLIENT_DIRTY_CAS;
    }
}

跟我们猜测的一样,就是每当key的内容被修改时,则遍历所有watch了该key的客户端,设置相应的状态为CLIENT_DIRTY_CAS。

三、redis事务辅助命令总结

上面就是redis事务命令中watch,unwatch的实现原理,其中最复杂的应该就是watch对应的那两套数据结构了,跟之前的pub/sub类似,都是使用链表+哈希表的结构存储,另外也是通过修改客户端的状态位FLAG来通知客户端。

代码比较多,而且C++代码看上去会比较费劲,需要慢慢读,反复读。

相关推荐