【Redis学习笔记】2018-06-28 redis命令源码学习1

顺风车运营研发团队 谭淼
1、dump
dump命令可以序列化给定 key ,并返回被序列化的值,使用 RESTORE命令可以将这个值反序列化为 Redis 键。

/* DUMP keyname
 * DUMP is actually not used by Redis Cluster but it is the obvious
 * complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
    robj *o, *dumpobj;
    rio payload;
 
    /* 检查key是否存在 */
    if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
        addReply(c,shared.nullbulk);
        return;
    }
 
    /* 创建序列化负载 */
    createDumpPayload(&payload,o);
 
    /* 传输给客户端 */
    dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
    addReplyBulk(c,dumpobj);
    decrRefCount(dumpobj);
    return;
}

dump命令的核心内容是创建序列化负载,该功能的实现是调用createDumpPayload()函数。

/* Generates a DUMP-format representation of the object 'o', adding it to the
 * io stream pointed by 'rio'. This function can't fail. */
void createDumpPayload(rio *payload, robj *o) {
    unsigned char buf[2];
    uint64_t crc;
 
    /* Serialize the object in a RDB-like format. It consist of an object type
     * byte followed by the serialized object. This is understood by RESTORE. */
    rioInitWithBuffer(payload,sdsempty());
    /* 在负载中添加对象的类型 */
    serverAssert(rdbSaveObjectType(payload,o));
    /* 根据不同的对象类型序列号对象 */
    serverAssert(rdbSaveObject(payload,o));
 
    /* Write the footer, this is how it looks like:
     * ----------------+---------------------+---------------+
     * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
     * ----------------+---------------------+---------------+
     * RDB version and CRC are both in little endian.
     */
 
    /* RDB的版本,这部分被分成了两个字节存储,可以表示0-65535*/
    buf[0] = RDB_VERSION & 0xff;
    buf[1] = (RDB_VERSION >> 8) & 0xff;
    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
 
    /* 计算CRC64校验码,共8字节 */
    crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
                sdslen(payload->io.buffer.ptr));
    memrev64ifbe(&crc);
    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
}

根据上面的代码可以看出序列化后的内容由下面几部分组成:
+-------------+---------------------+---------------+
| RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+-------------+---------------------+---------------+
浩含给出的更加详细的介绍wiki链接:dump与restore

2、exists
exists命令可以检查给定 key 是否存在。

/* EXISTS key1 key2 ... key_N.
 * Return value is the number of keys existing. */
void existsCommand(client *c) {
    long long count = 0;
    int j;
 
    for (j = 1; j < c->argc; j++) {
        /* 判断所给的key是否过期 */
        expireIfNeeded(c->db,c->argv[j]);
        /* 若不过期,则将计数器count自增1 */
        if (dbExists(c->db,c->argv[j])) count++;
    }
    /* 最后返回计数器的值 */
    addReplyLongLong(c,count);
}

示例如下:

127.0.0.1:7777> exists k1
(integer) 1
127.0.0.1:7777> exists k2
(integer) 1
127.0.0.1:7777> exists k1 k2
(integer) 2

3、expire、expireat、pexpire、pexpireat
这四个命令的作用是指定一个key的过期时间,它们的原理也都相同,最后都会转换为pexpireat命令来执行。

/* EXPIRE key seconds */
void expireCommand(client *c) {
    expireGenericCommand(c,mstime(),UNIT_SECONDS);
}
 
/* EXPIREAT key time */
void expireatCommand(client *c) {
    expireGenericCommand(c,0,UNIT_SECONDS);
}
 
/* PEXPIRE key milliseconds */
void pexpireCommand(client *c) {
    expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
}
 
/* PEXPIREAT key ms_time */
void pexpireatCommand(client *c) {
    expireGenericCommand(c,0,UNIT_MILLISECONDS);
}

可以看出,四个命令的实现原理都是调用pexpireatCommand()函数。

/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
 * and PEXPIREAT. Because the commad second argument may be relative or absolute
 * the "basetime" argument is used to signal what the base time is (either 0
 * for *AT variants of the command, or the current time for relative expires).
 *
 * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
 * the argv[2] parameter. The basetime is always specified in milliseconds. */
void expireGenericCommand(client *c, long long basetime, int unit) {
    /* 获取参数 */
    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */
 
    /* 将输入的参数保存在变量when之中 */
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
        return;
 
    /* 将when转换为毫秒时间戳 */
    if (unit == UNIT_SECONDS) when *= 1000;
    when += basetime;
 
    /* 查询要设置的key是否存在 */
    if (lookupKeyWrite(c->db,key) == NULL) {
        addReply(c,shared.czero);
        return;
    }
 
    /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
     * should never be executed as a DEL when load the AOF or in the context
     * of a slave instance.
     *
     * Instead we take the other branch of the IF statement setting an expire
     * (possibly in the past) and wait for an explicit DEL from the master. */
 
    /* 如果设置的时间已经过期,且没有正在加载AOF或者RDB文件,且执行命令的服务器是主服务器,
       则需要对过期key进行删除 */
    if (when <= mstime() && !server.loading && !server.masterhost) {
        robj *aux;
 
        int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
                                                    dbSyncDelete(c->db,key);
        serverAssertWithInfo(c,key,deleted);
        server.dirty++;
 
        /* Replicate/AOF this as an explicit DEL or UNLINK. */
        aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
        rewriteClientCommandVector(c,2,aux,key);
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
        addReply(c, shared.cone);
        return;
    } else {
        /* 如果没有过期,则设置过期时间 */
        setExpire(c,c->db,key,when);
        addReply(c,shared.cone);
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
        server.dirty++;
        return;
    }
}

设置过期时间,主要是调用setExpire()函数

/* Set an expire to the specified key. If the expire is set in the context
 * of an user calling a command 'c' is the client, otherwise 'c' is set
 * to NULL. The 'when' parameter is the absolute unix time in milliseconds
 * after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) {
    dictEntry *kde, *de;
 
    /* 首先在redis的dict中找到key,共用该key的sds */
    kde = dictFind(db->dict,key->ptr);
    serverAssertWithInfo(NULL,key,kde != NULL);
    /* 在过期dict中添加或找到该key */
    de = dictAddOrFind(db->expires,dictGetKey(kde));
    /* 为这个key设置过期时间 */
    dictSetSignedIntegerVal(de,when);
 
    int writable_slave = server.masterhost && server.repl_slave_ro == 0;
    if (c && writable_slave && !(c->flags & CLIENT_MASTER))
        rememberSlaveKeyWithExpire(db,key);
}

该原理如下:

对于redis的每个数据库,都有一个统一的数据结构:

/* Redis database representation. There are multiple databases identified
 * by integers from 0 (the default database) up to the max configured
 * database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
} redisDb;

其中dict是DB的键空间,expires记录了键空间的超时时间,需要注意的是dict和expires使用的是同一个key的sds。在设置过期时间的时候,首先会在dict中找到需要设置的key,找到后,在expires中添加或找到该key的sds,最后为key添加long long类型的过期时间。

4、keys
keys命令的作用是查找所有符合给定模式 pattern 的 key 。虽然keys速度很快,但是还是禁止在线上使用,因为会极大地消耗redis的资源。

void keysCommand(client *c) {
    dictIterator *di;
    dictEntry *de;
    sds pattern = c->argv[1]->ptr;
    int plen = sdslen(pattern), allkeys;
    unsigned long numkeys = 0;
    void *replylen = addDeferredMultiBulkLength(c);
    /* 安全迭代器 */
    di = dictGetSafeIterator(c->db->dict);
    allkeys = (pattern[0] == '*' && pattern[1] == '\0');
    /* 遍历dict的entry */
    while((de = dictNext(di)) != NULL) {
        sds key = dictGetKey(de);
        robj *keyobj;
        /* 判断key是否与正则匹配,若匹配且key没有过期则在回复给客户端的内容中记录 */
        if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
            keyobj = createStringObject(key,sdslen(key));
            if (expireIfNeeded(c->db,keyobj) == 0) {
                addReplyBulk(c,keyobj);
                numkeys++;
            }
            decrRefCount(keyobj);
        }
    }
    dictReleaseIterator(di);
    setDeferredMultiBulkLength(c,replylen,numkeys);
}

遍历与迭代:

迭代(iterate) - 按顺序访问线性结构中的每一项
遍历(traversal) - 按规则访问非线性结构中的每一项
参考链接:https://www.zhihu.com/questio...

洪宝的关于keys命令的wiki:2018.06.27日记(redis keys命令)

5、migrate
migrate的作用是将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功, key 保证会出现在目标实例上。

这个命令是一个原子操作,它在执行的时候会阻塞进行迁移的两个实例,直到迁移成功,迁移失败或等待超时。

该命令使用的场景不多,故没有详细分析,该命令的实现函数是migratecommand()函数,其原理是在当前实例对给定 key 执行DUMP命令 ,将它序列化,然后通过socket传送到目标实例,目标实例再使用RESTORE命令对数据进行反序列化,并将反序列化所得的数据添加到数据库中。

6、move
move命令也是对key进行转移,不过是将key从当前数据库转移到指定数据库中。

void moveCommand(client *c) {
    robj *o;
    redisDb *src, *dst;
    int srcid;
    long long dbid, expire;
 
    if (server.cluster_enabled) {
        addReplyError(c,"MOVE is not allowed in cluster mode");
        return;
    }
 
    /* 获取源数据库和目标数据库 */
    src = c->db;
    srcid = c->db->id;
 
    /* 判断DB是否合法 */
    if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
        dbid < INT_MIN || dbid > INT_MAX ||
        selectDb(c,dbid) == C_ERR)
    {
        addReply(c,shared.outofrangeerr);
        return;
    }
    dst = c->db;
    selectDb(c,srcid); /* Back to the source DB */
 
    /* 源和目标数据库不能相同 */
    if (src == dst) {
        addReply(c,shared.sameobjecterr);
        return;
    }
 
    /* 检查目标数据库是否存在需要移动的key */
    o = lookupKeyWrite(c->db,c->argv[1]);
    if (!o) {
        addReply(c,shared.czero);
        return;
    }
    expire = getExpire(c->db,c->argv[1]);
 
    /* Return zero if the key already exists in the target DB */
    if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
        addReply(c,shared.czero);
        return;
    }
    /* 向目标数据库添加数据 */
    dbAdd(dst,c->argv[1],o);
    if (expire != -1) setExpire(c,dst,c->argv[1],expire);
    incrRefCount(o);
 
    /* 删除源数据库的数据 */
    dbDelete(src,c->argv[1]);
    server.dirty++;
    addReply(c,shared.cone);
}

7、object
object命令允许从内部察看给定 key 的 Redis 对象,可以查看对象的refcount、encoding、idletime和freq。主要的思路是找到Redis对象,返回对象结构体中的相关参数(refcount和encoding)或者通过相关参数计算出需要查询的内容(idletime和freq)。

/* Object command allows to inspect the internals of an Redis Object.
 * Usage: OBJECT <refcount|encoding|idletime|freq> <key> */
void objectCommand(client *c) {
    robj *o;
 
    if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) {
        void *blenp = addDeferredMultiBulkLength(c);
        int blen = 0;
        blen++; addReplyStatus(c,
        "OBJECT <subcommand> key. Subcommands:");
        blen++; addReplyStatus(c,
        "refcount -- Return the number of references of the value associated with the specified key.");
        blen++; addReplyStatus(c,
        "encoding -- Return the kind of internal representation used in order to store the value associated with a key.");
        blen++; addReplyStatus(c,
        "idletime -- Return the idle time of the key, that is the approximated number of seconds elapsed since the last access to the key.");
        blen++; addReplyStatus(c,
        "freq -- Return the access frequency index of the key. The returned integer is proportional to the logarithm of the recent access frequency of the key.");
        setDeferredMultiBulkLength(c,blenp,blen);
    } else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        addReplyLongLong(c,o->refcount);
    } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        addReplyBulkCString(c,strEncoding(o->encoding));
    } else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
            return;
        }
        addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
    } else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
            addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
            return;
        }
        /* LFUDecrAndReturn should be called
         * in case of the key has not been accessed for a long time,
         * because we update the access time only
         * when the key is read or overwritten. */
        addReplyLongLong(c,LFUDecrAndReturn(o));
    } else {
        addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try OBJECT help",
            (char *)c->argv[1]->ptr);
    }
}

相关推荐