【Redis学习笔记】2018-06-28 redis命令源码学习1
顺风车运营研发团队 谭淼
1、dump
dump命令可以序列化给定 key ,并返回被序列化的值,使用 RESTORE命令可以将这个值反序列化为 Redis 键。
/* DUMP keyname * DUMP is actually not used by Redis Cluster but it is the obvious * complement of RESTORE and can be useful for different applications. */ void dumpCommand(client *c) { robj *o, *dumpobj; rio payload; /* 检查key是否存在 */ if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { addReply(c,shared.nullbulk); return; } /* 创建序列化负载 */ createDumpPayload(&payload,o); /* 传输给客户端 */ dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr); addReplyBulk(c,dumpobj); decrRefCount(dumpobj); return; }
dump命令的核心内容是创建序列化负载,该功能的实现是调用createDumpPayload()函数。
/* Generates a DUMP-format representation of the object 'o', adding it to the * io stream pointed by 'rio'. This function can't fail. */ void createDumpPayload(rio *payload, robj *o) { unsigned char buf[2]; uint64_t crc; /* Serialize the object in a RDB-like format. It consist of an object type * byte followed by the serialized object. This is understood by RESTORE. */ rioInitWithBuffer(payload,sdsempty()); /* 在负载中添加对象的类型 */ serverAssert(rdbSaveObjectType(payload,o)); /* 根据不同的对象类型序列号对象 */ serverAssert(rdbSaveObject(payload,o)); /* Write the footer, this is how it looks like: * ----------------+---------------------+---------------+ * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | * ----------------+---------------------+---------------+ * RDB version and CRC are both in little endian. */ /* RDB的版本,这部分被分成了两个字节存储,可以表示0-65535*/ buf[0] = RDB_VERSION & 0xff; buf[1] = (RDB_VERSION >> 8) & 0xff; payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2); /* 计算CRC64校验码,共8字节 */ crc = crc64(0,(unsigned char*)payload->io.buffer.ptr, sdslen(payload->io.buffer.ptr)); memrev64ifbe(&crc); payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8); }
根据上面的代码可以看出序列化后的内容由下面几部分组成:
+-------------+---------------------+---------------+
| RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+-------------+---------------------+---------------+
浩含给出的更加详细的介绍wiki链接:dump与restore
2、exists
exists命令可以检查给定 key 是否存在。
/* EXISTS key1 key2 ... key_N. * Return value is the number of keys existing. */ void existsCommand(client *c) { long long count = 0; int j; for (j = 1; j < c->argc; j++) { /* 判断所给的key是否过期 */ expireIfNeeded(c->db,c->argv[j]); /* 若不过期,则将计数器count自增1 */ if (dbExists(c->db,c->argv[j])) count++; } /* 最后返回计数器的值 */ addReplyLongLong(c,count); }
示例如下:
127.0.0.1:7777> exists k1
(integer) 1
127.0.0.1:7777> exists k2
(integer) 1
127.0.0.1:7777> exists k1 k2
(integer) 2
3、expire、expireat、pexpire、pexpireat
这四个命令的作用是指定一个key的过期时间,它们的原理也都相同,最后都会转换为pexpireat命令来执行。
/* EXPIRE key seconds */ void expireCommand(client *c) { expireGenericCommand(c,mstime(),UNIT_SECONDS); } /* EXPIREAT key time */ void expireatCommand(client *c) { expireGenericCommand(c,0,UNIT_SECONDS); } /* PEXPIRE key milliseconds */ void pexpireCommand(client *c) { expireGenericCommand(c,mstime(),UNIT_MILLISECONDS); } /* PEXPIREAT key ms_time */ void pexpireatCommand(client *c) { expireGenericCommand(c,0,UNIT_MILLISECONDS); }
可以看出,四个命令的实现原理都是调用pexpireatCommand()函数。
/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT * and PEXPIREAT. Because the commad second argument may be relative or absolute * the "basetime" argument is used to signal what the base time is (either 0 * for *AT variants of the command, or the current time for relative expires). * * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for * the argv[2] parameter. The basetime is always specified in milliseconds. */ void expireGenericCommand(client *c, long long basetime, int unit) { /* 获取参数 */ robj *key = c->argv[1], *param = c->argv[2]; long long when; /* unix time in milliseconds when the key will expire. */ /* 将输入的参数保存在变量when之中 */ if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK) return; /* 将when转换为毫秒时间戳 */ if (unit == UNIT_SECONDS) when *= 1000; when += basetime; /* 查询要设置的key是否存在 */ if (lookupKeyWrite(c->db,key) == NULL) { addReply(c,shared.czero); return; } /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past * should never be executed as a DEL when load the AOF or in the context * of a slave instance. * * Instead we take the other branch of the IF statement setting an expire * (possibly in the past) and wait for an explicit DEL from the master. */ /* 如果设置的时间已经过期,且没有正在加载AOF或者RDB文件,且执行命令的服务器是主服务器, 则需要对过期key进行删除 */ if (when <= mstime() && !server.loading && !server.masterhost) { robj *aux; int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : dbSyncDelete(c->db,key); serverAssertWithInfo(c,key,deleted); server.dirty++; /* Replicate/AOF this as an explicit DEL or UNLINK. */ aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; rewriteClientCommandVector(c,2,aux,key); signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); addReply(c, shared.cone); return; } else { /* 如果没有过期,则设置过期时间 */ setExpire(c,c->db,key,when); addReply(c,shared.cone); signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); server.dirty++; return; } }
设置过期时间,主要是调用setExpire()函数
/* Set an expire to the specified key. If the expire is set in the context * of an user calling a command 'c' is the client, otherwise 'c' is set * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, long long when) { dictEntry *kde, *de; /* 首先在redis的dict中找到key,共用该key的sds */ kde = dictFind(db->dict,key->ptr); serverAssertWithInfo(NULL,key,kde != NULL); /* 在过期dict中添加或找到该key */ de = dictAddOrFind(db->expires,dictGetKey(kde)); /* 为这个key设置过期时间 */ dictSetSignedIntegerVal(de,when); int writable_slave = server.masterhost && server.repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) rememberSlaveKeyWithExpire(db,key); }
该原理如下:
对于redis的每个数据库,都有一个统一的数据结构:
/* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ } redisDb;
其中dict是DB的键空间,expires记录了键空间的超时时间,需要注意的是dict和expires使用的是同一个key的sds。在设置过期时间的时候,首先会在dict中找到需要设置的key,找到后,在expires中添加或找到该key的sds,最后为key添加long long类型的过期时间。
4、keys
keys命令的作用是查找所有符合给定模式 pattern 的 key 。虽然keys速度很快,但是还是禁止在线上使用,因为会极大地消耗redis的资源。
void keysCommand(client *c) { dictIterator *di; dictEntry *de; sds pattern = c->argv[1]->ptr; int plen = sdslen(pattern), allkeys; unsigned long numkeys = 0; void *replylen = addDeferredMultiBulkLength(c); /* 安全迭代器 */ di = dictGetSafeIterator(c->db->dict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); /* 遍历dict的entry */ while((de = dictNext(di)) != NULL) { sds key = dictGetKey(de); robj *keyobj; /* 判断key是否与正则匹配,若匹配且key没有过期则在回复给客户端的内容中记录 */ if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { keyobj = createStringObject(key,sdslen(key)); if (expireIfNeeded(c->db,keyobj) == 0) { addReplyBulk(c,keyobj); numkeys++; } decrRefCount(keyobj); } } dictReleaseIterator(di); setDeferredMultiBulkLength(c,replylen,numkeys); }
遍历与迭代:
迭代(iterate) - 按顺序访问线性结构中的每一项
遍历(traversal) - 按规则访问非线性结构中的每一项
参考链接:https://www.zhihu.com/questio...
洪宝的关于keys命令的wiki:2018.06.27日记(redis keys命令)
5、migrate
migrate的作用是将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功, key 保证会出现在目标实例上。
这个命令是一个原子操作,它在执行的时候会阻塞进行迁移的两个实例,直到迁移成功,迁移失败或等待超时。
该命令使用的场景不多,故没有详细分析,该命令的实现函数是migratecommand()函数,其原理是在当前实例对给定 key 执行DUMP命令 ,将它序列化,然后通过socket传送到目标实例,目标实例再使用RESTORE命令对数据进行反序列化,并将反序列化所得的数据添加到数据库中。
6、move
move命令也是对key进行转移,不过是将key从当前数据库转移到指定数据库中。
void moveCommand(client *c) { robj *o; redisDb *src, *dst; int srcid; long long dbid, expire; if (server.cluster_enabled) { addReplyError(c,"MOVE is not allowed in cluster mode"); return; } /* 获取源数据库和目标数据库 */ src = c->db; srcid = c->db->id; /* 判断DB是否合法 */ if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR || dbid < INT_MIN || dbid > INT_MAX || selectDb(c,dbid) == C_ERR) { addReply(c,shared.outofrangeerr); return; } dst = c->db; selectDb(c,srcid); /* Back to the source DB */ /* 源和目标数据库不能相同 */ if (src == dst) { addReply(c,shared.sameobjecterr); return; } /* 检查目标数据库是否存在需要移动的key */ o = lookupKeyWrite(c->db,c->argv[1]); if (!o) { addReply(c,shared.czero); return; } expire = getExpire(c->db,c->argv[1]); /* Return zero if the key already exists in the target DB */ if (lookupKeyWrite(dst,c->argv[1]) != NULL) { addReply(c,shared.czero); return; } /* 向目标数据库添加数据 */ dbAdd(dst,c->argv[1],o); if (expire != -1) setExpire(c,dst,c->argv[1],expire); incrRefCount(o); /* 删除源数据库的数据 */ dbDelete(src,c->argv[1]); server.dirty++; addReply(c,shared.cone); }
7、object
object命令允许从内部察看给定 key 的 Redis 对象,可以查看对象的refcount、encoding、idletime和freq。主要的思路是找到Redis对象,返回对象结构体中的相关参数(refcount和encoding)或者通过相关参数计算出需要查询的内容(idletime和freq)。
/* Object command allows to inspect the internals of an Redis Object. * Usage: OBJECT <refcount|encoding|idletime|freq> <key> */ void objectCommand(client *c) { robj *o; if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) { void *blenp = addDeferredMultiBulkLength(c); int blen = 0; blen++; addReplyStatus(c, "OBJECT <subcommand> key. Subcommands:"); blen++; addReplyStatus(c, "refcount -- Return the number of references of the value associated with the specified key."); blen++; addReplyStatus(c, "encoding -- Return the kind of internal representation used in order to store the value associated with a key."); blen++; addReplyStatus(c, "idletime -- Return the idle time of the key, that is the approximated number of seconds elapsed since the last access to the key."); blen++; addReplyStatus(c, "freq -- Return the access frequency index of the key. The returned integer is proportional to the logarithm of the recent access frequency of the key."); setDeferredMultiBulkLength(c,blenp,blen); } else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk)) == NULL) return; addReplyLongLong(c,o->refcount); } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk)) == NULL) return; addReplyBulkCString(c,strEncoding(o->encoding)); } else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk)) == NULL) return; if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust."); return; } addReplyLongLong(c,estimateObjectIdleTime(o)/1000); } else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk)) == NULL) return; if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) { addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust."); return; } /* LFUDecrAndReturn should be called * in case of the key has not been accessed for a long time, * because we update the access time only * when the key is read or overwritten. */ addReplyLongLong(c,LFUDecrAndReturn(o)); } else { addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try OBJECT help", (char *)c->argv[1]->ptr); } }