详解ASP.Net Core 中如何借助CSRedis实现一个安全高效的分布式锁
引言:最近回头看了看开发的.Net Core 2.1项目的复盘总结,其中在多处用到Redis实现的分布式锁,虽然在OnResultExecuting方法中做了防止死锁的处理,但在某些场景下还是会发生死锁的问题,下面我只展示部分代码:
问题:
(1)这里setnx设置的值“1”,我想问,你最后del的这个值一定是你自己创建的吗?
(2)图中标注的步骤1和步骤2不是原子操作,会有死锁的概率吗?
大家可以思考一下先,下面让我们带着这两个问题往下看,下面介绍一下使用Redis实现分布式锁常用的几个命令。
一、使用Redis实现分布式锁常见的几个命令
► Setnx
命令:SETNX key value
说明:将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不做任何动作。SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
时间复杂度:O(1)
返回值:设置成功,返回1 ; 设置失败,返回 0
► Getset
命令:GETSET key value
说明:将给定 key 的值设为 value ,并返回 key 的旧值(old value)。当 key 存在但不是字符串类型时,返回一个错误。
时间复杂度:O(1)
返回值:返回给定 key 的旧值; 当 key 没有旧值时,也即是, key 不存在时,返回 nil 。
► Expire
命令:EXPIRE key seconds
说明:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。
时间复杂度:O(1)
返回值:设置成功返回 1 ;当 key 不存在或者不能为 key 设置生存时间时(比如在低于 2.1.3 版本的 Redis 中你尝试更新 key 的生存时间),返回 0 。
► Del
命令:DEL key [key ...]
说明:删除给定的一个或多个 key 。不存在的 key 会被忽略。
时间复杂度:O(N); N 为被删除的 key 的数量。
删除单个字符串类型的 key ,时间复杂度为O(1)。
删除单个列表、集合、有序集合或哈希表类型的 key ,时间复杂度为O(M), M 为以上数据结构内的元素数量。
返回值:被删除 key 的数量。
好了,命令熟悉之后,下面我们就开始一步一步实现分布式锁。
二、使用Redis实现分布式锁版本一:与时间戳的结合
对于上面的setnx设置的默认值1,我们采用时间戳来防止问题一,下面先让我们来看下想当然写法流程图。
流程图:
C#代码实现:
static void Main(string[] args) { var lockTimeout = 5000;//单位是毫秒 var currentTime = DateTime.Now.ToUnixTime(true); if (SetNx("lockkey", currentTime+ lockTimeout,lockTimeout)) { //TODO:一些业务逻辑代码 //..... //..... //最后释放锁 Remove("lockkey"); } else { Console.WriteLine("没有获得分布式锁"); } Console.ReadKey(); } public static bool SetNx(string key,long time ,double expireMS) { if (redisClient.SetNx(key, time)) { if (expireMS > 0) redisClient.Expire(key, TimeSpan.FromMilliseconds(expireMS)); return true; } return false; } public static bool Remove(string key) { return redisClient.Del(key) > 0; }
上面的代码中value的值我们使用时间戳,不是一个固定的值了,至少能保证你删除的key确实是你自己的,所以,建议大家在设value的值时,不要设置一个固定的值,最好是随机的。但是这样写虽然解决了问题一,但是这种写法还是存在一定的风险,虽然Redis是单线程的并且setnx、expire是原子操作,但是先setnx再expire就不是原子操作了!!!我们要考虑多线程环境和容器部署时多实例环境等等,那这样的写法就会出现问题。
比如:现在有A、B两台服务器在跑这个应用,当A台应用跑到:setnx成功但是还没有设置过期时间的时候,突然重启服务,这个时候在分布式环境中就会发生死锁的问题,因为你没有设置过期时间。
下面我们通过调试来展示死锁的场景:
A应用:在执行到setnx成功但是在执行expire之前宕机了,此时的Redis已经有数据了,但是没有过期时间
B应用:运行正常
但是B应用就会一直获取不到锁,导致死锁。
所以上面在获取锁的逻辑还是有问题的,为了解决这个问题,我们采用下面的方式来处理。
三、使用Redis实现分布式锁版本二:双重防死锁
流程图:
C#代码实现:
public static void RedisLockV2() { var lockTimeout = 5000;//单位是毫秒 var currentTime = DateTime.Now.ToUnixTime(true); if (SetNxV2("lockkey",DateTime.Now.ToUnixTime(true)+lockTimeout)) { //设置过期时间 redisClient.Expire("lockkey", TimeSpan.FromMilliseconds(5000)); //TODO:一些业务逻辑代码 Console.WriteLine("处理业务ing"); Thread.Sleep(100000); Console.WriteLine("处理业务ed"); //最后释放锁 Remove("lockkey"); } else { //未获取到锁,继续判断,判断时间戳看看是否可以重置并获取锁 var lockValue = redisClient.Get("lockkey"); var time = DateTime.Now.ToUnixTime(true); if (!string.IsNullOrEmpty(lockValue) && time> lockValue.ToInt64()) { //再次用当前时间戳getset //返回固定key的旧值,旧值判断是否可以获取锁 var getsetResult = redisClient.GetSet("lockkey", time); if (getsetResult == null || (getsetResult != null && getsetResult == lockValue)) { Console.WriteLine("获取到Redis锁了"); //真正获取到锁 redisClient.Expire("lockkey", TimeSpan.FromMilliseconds(5000)); //TODO:一些业务逻辑代码 //..... //..... Console.WriteLine("处理业务"); //最后释放锁 Remove("lockkey"); } else { Console.WriteLine("没有获取到锁"); } } else { Console.WriteLine("没有获取到锁"); } } }
现在,Redis中的情况如下:
我们运行上面的代码,结果如下:
副本.exe中添加一行代码。来模拟这种场景:有A、B两台服务器在跑这个应用,当A台应用跑到:setnx成功但是还没有设置过期时间的时候,突然重启服务,这个时候在分布式环境中就会发生死锁的问题,因为你没有设置过期时间
我们先执行Lottery.ThriftRpc - 副本.exe,等Redis里面有值了,并且这个key是没有过期时间,再关闭掉该程序:
然后,再执行Lottery.ThriftRpc.exe
看,我们是不是解决了该问题,至于过期时间设置为多少要结合你的具体业务处理时间来计算出一个合理的值,好了,聊到这里关于Redis的分布式锁就讲完了,希望对你有帮助,谢谢。
四、总结:
上面的示例中Redis的组件用的是CSRedisCore,这里只是自己的一点体会,如果你有更好的办法,可以在评论区讨论,关于Redis的理论讲解有太多的文章了,大家可以参考,关于Redis的文章我只总结工作中遇到的一些问题,关于文章中的源码,我就不提供了,太简单了。后面我会不定期分享一些Redis的问题,希望大家多多支持。