【Redis学习笔记】2018-07-10 Redis指令学习3
顺风车运营研发团队 闫昌
一. smove
使用方式: smove source destination member
使用描述: 将member元素从source集合移动到destination集合
smove是原子性操作
如果 source 集合不存在或不包含指定的 member 元素,则 SMOVE 命令不执行任何操作,仅返回 0 。否则, member 元素从 source 集合中被移除,并添加到 destination 集合中去
当 destination 集合已经包含 member 元素时, SMOVE 命令只是简单地将 source 集合中的 member 元素删除。
当 source 或 destination 不是集合类型时,返回一个错误。
源码分析:
void smoveCommand(client *c) { robj *srcset, *dstset, *ele; srcset = lookupKeyWrite(c->db,c->argv[1]); dstset = lookupKeyWrite(c->db,c->argv[2]); ele = c->argv[3]; if (srcset == NULL) {//如果source不存在, 则直接返回 addReply(c,shared.czero); return; } if (checkType(c,srcset,OBJ_SET) || (dstset && checkType(c,dstset,OBJ_SET))) return;//source和dest必须全部为set类型, 否则退出 if (srcset == dstset) {//如果source和dest相同, 则直接返回 addReply(c,setTypeIsMember(srcset,ele->ptr) ? shared.cone : shared.czero); return; } if (!setTypeRemove(srcset,ele->ptr)) {//如果source移除失败, 则直接返回 addReply(c,shared.czero); return; } notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id); if (setTypeSize(srcset) == 0) {//如果移除一个元素之后, source已经为空, 则将source删除 dbDelete(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); } if (!dstset) {//如果dest为空, 需要先将dest创建出来 dstset = setTypeCreate(ele->ptr); dbAdd(c->db,c->argv[2],dstset); } signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); server.dirty++; if (setTypeAdd(dstset,ele->ptr)) {//将元素添加到dest当中 server.dirty++; notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id); } addReply(c,shared.cone); }
二. spop
使用方式spop key [count]
从设置值存储中移除并返回一个或多个随机key
源码分析: count=0的情况
void spopCommand(client *c) { robj *set, *ele, *aux; sds sdsele; int64_t llele; int encoding; if (c->argc == 3) { spopWithCountCommand(c); return; } else if (c->argc > 3) { addReply(c,shared.syntaxerr); return; } if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,OBJ_SET)) return; /** 获取随机元素算法: 1. 如果是hash类型, 获取hash key 1.1 正在rehash: 则获取 rehashidx + random()%(ht[0]+ht[1]-rehashidx) //此时key可能在ht[0]也可能在ht[1]中, 但一但在rehashidx之后 1.2 没有在rehash, 直接: random() % ht[0].length 2. 如果是整数集合类型: rand()%length */ encoding = setTypeRandomElement(set,&sdsele,&llele);//获取随机的一个元素 if (encoding == OBJ_ENCODING_INTSET) { ele = createStringObjectFromLongLong(llele); set->ptr = intsetRemove(set->ptr,llele,NULL);//删除这个元素 } else { ele = createStringObject(sdsele,sdslen(sdsele)); setTypeRemove(set,ele->ptr);//删除这个元素 } notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id); aux = createStringObject("SREM",4); rewriteClientCommandVector(c,3,aux,c->argv[1],ele); decrRefCount(aux); addReplyBulk(c,ele); decrRefCount(ele); if (setTypeSize(set) == 0) {//如果set已经为空, 则需要删除set集合 dbDelete(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); } signalModifiedKey(c->db,c->argv[1]); server.dirty++; }
count不为0的情况:
当count数量大于set的个数时, 直接返回所有的值, 并删除set
size-count=remining, 当remining*5>count时, 循环count次, 每次随机弹出一个key
当remining<=5时, 循环remining次, 随机弹出一个值并压入到一个新的set中, 最后将这个新的set覆盖原来的set
void spopWithCountCommand(client *c) { long l; unsigned long count, size; robj *set; if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return; if (l >= 0) { count = (unsigned long) l; } else { addReply(c,shared.outofrangeerr); return; } if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,set,OBJ_SET)) return; if (count == 0) { addReply(c,shared.emptymultibulk); return; } size = setTypeSize(set); notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id); server.dirty += count; /* CASE 1: * 当count数量大于set的数量, 则直接将set的所有元素都返回, 并删除set */ if (count >= size) { sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION);//获取集合中的所有值 dbDelete(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); rewriteClientCommandVector(c,2,shared.del,c->argv[1]); signalModifiedKey(c->db,c->argv[1]); server.dirty++; return; } robj *propargv[3]; propargv[0] = createStringObject("SREM",4); propargv[1] = c->argv[1]; addReplyMultiBulkLen(c,count); /* Common iteration vars. */ sds sdsele; robj *objele; int encoding; int64_t llele; unsigned long remaining = size-count; /* Elements left after SPOP. */ /*当删除count个元素后, 剩下的元素个数*5大于count时, 需要循环count次, 每次随机pop出一个元素*/ if (remaining*SPOP_MOVE_STRATEGY_MUL > count) { while(count--) { /* Emit and remove. */ encoding = setTypeRandomElement(set,&sdsele,&llele);//获取随机的一个元素 if (encoding == OBJ_ENCODING_INTSET) { addReplyBulkLongLong(c,llele); objele = createStringObjectFromLongLong(llele); set->ptr = intsetRemove(set->ptr,llele,NULL);//删除随机出来的元素 } else { addReplyBulkCBuffer(c,sdsele,sdslen(sdsele)); objele = createStringObject(sdsele,sdslen(sdsele)); setTypeRemove(set,sdsele); } /* Replicate/AOF this command as an SREM operation */ propargv[2] = objele; alsoPropagate(server.sremCommand,c->db->id,propargv,3, PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(objele); } } else { /* 否则循环remining次, 将pop出来的元素赋值给新的set, 最后将新的set覆盖原来的set值 */ robj *newset = NULL; /* Create a new set with just the remaining elements. */ while(remaining--) { encoding = setTypeRandomElement(set,&sdsele,&llele); if (encoding == OBJ_ENCODING_INTSET) { sdsele = sdsfromlonglong(llele); } else { sdsele = sdsdup(sdsele); } if (!newset) newset = setTypeCreate(sdsele); setTypeAdd(newset,sdsele); setTypeRemove(set,sdsele); sdsfree(sdsele); } incrRefCount(set); /* Protect the old set value. */ dbOverwrite(c->db,c->argv[1],newset);//将旧的set用新的set来覆盖 /* Tranfer the old set to the client and release it. */ setTypeIterator *si; si = setTypeInitIterator(set); while((encoding = setTypeNext(si,&sdsele,&llele)) != -1) { if (encoding == OBJ_ENCODING_INTSET) { addReplyBulkLongLong(c,llele); objele = createStringObjectFromLongLong(llele); } else { addReplyBulkCBuffer(c,sdsele,sdslen(sdsele)); objele = createStringObject(sdsele,sdslen(sdsele)); } /* Replicate/AOF this command as an SREM operation */ propargv[2] = objele; alsoPropagate(server.sremCommand,c->db->id,propargv,3, PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(objele); } setTypeReleaseIterator(si); decrRefCount(set); } decrRefCount(propargv[0]); preventCommandPropagation(c); signalModifiedKey(c->db,c->argv[1]); server.dirty++; }
三. sismember
使用方式: sismember key member
判断member元素是否是集合key的成员
源码分析:
int setTypeIsMember(robj *subject, sds value) { long long llval; if (subject->encoding == OBJ_ENCODING_HT) { return dictFind((dict*)subject->ptr,value) != NULL;//从字典中查找 } else if (subject->encoding == OBJ_ENCODING_INTSET) { if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { return intsetFind((intset*)subject->ptr,llval);//从整数集合中查找 } } else { serverPanic("Unknown set encoding"); } return 0; }
因为整数集合中的元素是按从小到大有序排列的, 所以用二分法查找即可找出
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) { int min = 0, max = intrev32ifbe(is->length)-1, mid = -1; int64_t cur = -1; if (intrev32ifbe(is->length) == 0) {//如果集合长度为0, 则直接返回 if (pos) *pos = 0; return 0; } else { if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {//如果value大于整数集合的最大值, 则直接返回 if (pos) *pos = intrev32ifbe(is->length); return 0; } else if (value < _intsetGet(is,0)) {//如果value小于整数集合的最小值, 则直接返回 if (pos) *pos = 0; return 0; } } while(max >= min) {//二分法查找 mid = ((unsigned int)min + (unsigned int)max) >> 1; cur = _intsetGet(is,mid);//整数集合中的数组contents是以char来存储的值, 但整数集合有int16, int32, int64三种值, 所以要找值时, 需要根据偏移量和长度来获取正确的值 if (value > cur) { min = mid+1; } else if (value < cur) { max = mid-1; } else { break; } } if (value == cur) { if (pos) *pos = mid; return 1; } else { if (pos) *pos = min; return 0; } } static int64_t _intsetGet(intset *is, int pos) { return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding)); } static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) { int64_t v64; int32_t v32; int16_t v16; if (enc == INTSET_ENC_INT64) { memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));//需要根据偏移量(pos), 和长度来获取到正确的值 memrev64ifbe(&v64); return v64; } else if (enc == INTSET_ENC_INT32) { memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32)); memrev32ifbe(&v32); return v32; } else { memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16)); memrev16ifbe(&v16); return v16; } }
四. smembers
执行方式: smembers key
返回集合key中的所有成员
不存在的key被视为空集合
smembers底层调用sinterGenericCommand函数, 即将所有集合(现在只有一个)取交集
void sinterGenericCommand(client *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; robj *dstset = NULL; sds elesds; int64_t intobj; void *replylen = NULL; unsigned long j, cardinality = 0; int encoding; for (j = 0; j < setnum; j++) {//setnum的值为1, 这个for循环只会执行一次 robj *setobj = dstkey ? lookupKeyWrite(c->db,setkeys[j]) : lookupKeyRead(c->db,setkeys[j]); if (!setobj) { zfree(sets); if (dstkey) { if (dbDelete(c->db,dstkey)) { signalModifiedKey(c->db,dstkey); server.dirty++; } addReply(c,shared.czero); } else { addReply(c,shared.emptymultibulk); } return; } if (checkType(c,setobj,OBJ_SET)) { zfree(sets); return; } sets[j] = setobj;//将集合赋值给新的集合sets[0] } qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality); if (!dstkey) { replylen = addDeferredMultiBulkLength(c); } else { /* If we have a target key where to store the resulting set * create this key with an empty set inside */ dstset = createIntsetObject(); } si = setTypeInitIterator(sets[0]); while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) { ... ... /* Only take action when all sets contain the member */ if (j == setnum) { if (!dstkey) {//while循环set, 将每次遍历取出的值返回给客户端 if (encoding == OBJ_ENCODING_HT) addReplyBulkCBuffer(c,elesds,sdslen(elesds)); else addReplyBulkLongLong(c,intobj); cardinality++; } else { ... } } } setTypeReleaseIterator(si); if (dstkey) { ...... } else { setDeferredMultiBulkLength(c,replylen,cardinality); } zfree(sets); }
五. srandmember
同spop命令, 同样可以接受多个count, 它与spop的区别为srandmember不会删除元素值, spop会删除元素值
六. srem
使用方式 srem key member [member ....]
移除集合中的一个或多个member元素, 不存在的member元素会被忽略
源码分析:
void sremCommand(client *c) { robj *set; int j, deleted = 0, keyremoved = 0; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,OBJ_SET)) return; for (j = 2; j < c->argc; j++) {//可以删除多个元素 if (setTypeRemove(set,c->argv[j]->ptr)) { deleted++; if (setTypeSize(set) == 0) { dbDelete(c->db,c->argv[1]); keyremoved = 1; break; } } } if (deleted) { signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1], c->db->id); server.dirty += deleted; } addReplyLongLong(c,deleted); } int setTypeRemove(robj *setobj, sds value) { long long llval; if (setobj->encoding == OBJ_ENCODING_HT) { if (dictDelete(setobj->ptr,value) == DICT_OK) {//从字典中删除 if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr); return 1; } } else if (setobj->encoding == OBJ_ENCODING_INTSET) { if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { int success; setobj->ptr = intsetRemove(setobj->ptr,llval,&success);//从整数集合中删除 if (success) return 1; } } else { serverPanic("Unknown set encoding"); } return 0; }