(给DotNet加星标,提升.Net技能)
转自:热敷哥cnblogs.com/refuge/p/13774008.html
为什么要用分布式锁?
先上一张截图,这是在浏览别人的博客时看到的.
在了解为什么要用分布式锁之前,我们应该知道到底什么是分布式锁.
锁按照不同的维度,有多种分类.比如
1、悲观锁,乐观锁;
2、公平锁,非公平锁;
3、独享锁,共享锁;
4、线程锁,进程锁;
等等.
我们平时用的锁,比如 lock,它是线程锁,主要用来给方法,代码块加锁.由于进程的内存单元是被其所有线程共享的,所以线程锁控制的实际是多个线程对同一块内存区域的访问.
有线程锁,就必然有进程锁.顾名思义,进程锁的目的是控制多个进程对共享资源的访问.因为进程之间彼此独立,各个进程是无法控制其他进程对资源的访问,所以只能通过操作系统来控制.比如 Mutex.
但是进程锁有一个前提,那就是需要多个进程在同一个系统中,如果多个进程不在同一个系统,那就只能使用分布式锁来控制了.
分布式锁是控制分布式系统中不同系统之间访问共享资源的一种锁实现.它和线程锁,进程锁的作用都是一样,只是范围不一样.
所以要实现分布式锁,就必须依靠第三方存储介质来存储锁的信息.因为各个进程之间彼此谁都不服谁,只能找一个带头大哥咯;
以下示例需引用NUGET: CSRedisCore
示例一
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0");
var lockKey = "lockKey";
var stock = 5;//商品库存
var taskCount = 10;//线程数量
redisClient.Del(lockKey);//测试前,先把锁删了.
for (int i = 0; i < taskCount; i++)
{
Task.Run(() =>
{
//获取锁
do
{
//setnx : key不存在才会成功,存在则失败.
var success = redisClient.SetNx(lockKey, 1);
if (success == true)
{
break;
}
Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再尝试获取锁
} while (true);
Console.WriteLine($"线程:{Task.CurrentId} 拿到了锁,开始消费");
if (stock <= 0)
{
Console.WriteLine($"库存不足,线程:{Task.CurrentId} 抢购失败!");
redisClient.Del(lockKey);
return;
}
stock--;
//模拟处理业务
Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3)));
Console.WriteLine($"线程:{Task.CurrentId} 消费完毕!剩余 {stock} 个");
//业务处理完后,释放锁.
redisClient.Del(lockKey);
});
}
运行结果:
看起来貌似没毛病,实际上上述代码有个致命的问题:
当某个线程拿到锁之后,如果系统崩溃了,那么锁永远都不会被释放.因此,我们应该给锁加一个过期时间,当时间到了,还没有被主动释放,我们就让redis释放掉它,以保证其他消费者可以拿到锁,进行消费.
这里给锁加过期时间也有讲究,不能拿到锁后再加,比如:
......
//setnx : key不存在才会成功,存在则失败.
var success = redisClient.SetNx(lockKey, 1);
if (success == true)
{
redisClient.Set(lockKey, 1, expireSeconds: 5);
break;
}
这样操作的话,获取锁和设置锁的过期时间就不是原子操作,同样会出现上面提到的问题.Redis 提供了一个合而为一的操作可以解决这个问题.
//set : key存在则失败,不存在才会成功,并且过期时间5秒
var success = redisClient.Set(lockKey, 1, expireSeconds: 5, exists: RedisExistence.Nx);
这个问题虽然解决了,但随之产生了一个新的问题:
假设有3个线程A,B,C
当线程A拿到锁后执行业务的时候超时了,超过了锁的过期时间还没执行完,这时候锁被Redis释放了,
于是线程B拿到了锁并开始执行业务逻辑.
当线程B的业务逻辑还没执行完的时候,线程A的业务逻辑执行完了,于是乎就跑去释放掉了锁.
这时候线程C就可以拿到锁开始执行它的业务逻辑.
这不就乱套了么...
因此,线程在释放锁的时候应该判断这个锁还属不属于自己.
所以,在设置锁的时候,redis的value值不能像上面代码那样,随便给个1,而应该给一个随机值,代表当前线程.
var id = Guid.NewGuid().ToString("N");
//获取锁
do
{
//set : key存在则失败,不存在才会成功,并且过期时间5秒
var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx);
if (success == true)
{
break;
}
Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再尝试获取锁
} while (true);
Console.WriteLine($"线程:{Task.CurrentId} 拿到了锁,开始消费");
.........
//业务处理完后,释放锁.
var value = redisClient.Get<string>(lockKey);
if (value == id)
{
redisClient.Del(lockKey);
}
完美了吗?
不完美.还是老生常谈的问题,取value和删除key 分了两步走,不是原子操作.
并且,这里还不能用pipe,因为需要根据取到的value来决定下一个操作.上面设置过期时间倒是可以用pipe.
所以,这里只能用lua.
完整的代码如下:
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0");
var lockKey = "lockKey";
var stock = 5;//商品库存
var taskCount = 10;//线程数量
var script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//释放锁的redis脚本
redisClient.Del(lockKey);//测试前,先把锁删了.
for (int i = 0; i < taskCount; i++)
{
Task.Run(() =>
{
var id = Guid.NewGuid().ToString("N");
//获取锁
do
{
//set : key存在则失败,不存在才会成功,并且过期时间5秒
var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx);
if (success == true)
{
break;
}
Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再尝试获取锁
} while (true);
Console.WriteLine($"线程:{Task.CurrentId} 拿到了锁,开始消费");
if (stock <= 0)
{
Console.WriteLine($"库存不足,线程:{Task.CurrentId} 抢购失败!");
redisClient.Eval(script,lockKey,id);
return;
}
stock--;
//模拟处理业务,这里不考虑失败的情况
Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3)));
Console.WriteLine($"线程:{Task.CurrentId} 消费完毕!剩余 {stock} 个");
//业务处理完后,释放锁.
redisClient.Eval(script, lockKey, id);
});
}
这篇文章只介绍了单节点Redis的分布式锁,因为单节点,所以不是高可用.
多节点Redis则需要用官方介绍的RedLock,这玩意有点绕,我需要捋一捋.
- EOF -
推荐阅读 点击标题可跳转如何实现一个模块化方案二
.NET中使用DiagnosticSource
三种方式让你轻松监控Entity Framework中的sql流转
看完本文有收获?请转发分享给更多人
关注「DotNet」加星标,提升.Net技能
好文章,我在看❤️