系统分布式锁的用法
公司框架新增功能分布式锁:
锁的性能之王:缓存 > Zookeeper >= 数据库
锁的实现
实现原理:核心采用StackExchange.Redis的LockTake方法实现。
支持同步获取锁,或者等待直到超时获取锁。
/// <summary>/// 分布式锁,提供全局分布式锁支持,以resource redis为基础/// 这个锁只能通过RpcContext来获取,通过自己手动释放/// </summary>public sealed class DistributedLock{private static readonly TimeSpan DefaultAbandonmentCheckFrequency = TimeSpan.FromSeconds(2);public readonly string lockName;private readonly string lockValue;private readonly int checkTimeSpan = 50; //msprivate readonly int autoDelete;private DistributedLock(){}/// <summary>/// /// </summary>/// <param name="lockName"></param>/// <param name="autoDelete">自动删除,ms,默认 60s</param>/// <param name="checkTimeSpan">如果不能获取锁,重复检查间隔:默认 50ms</param>internal DistributedLock(string lockName, int autoDelete = 60000,int checkTimeSpan = 50){// note that just Global\ is not a valid nameif (string.IsNullOrEmpty(lockName))throw new ArgumentNullException("lockName不能为空");if (null == ResourceCache.Instance)throw new Exception(@"ResourceCache 没有配置或无法连接");this.checkTimeSpan = Math.Max(checkTimeSpan,1);this.autoDelete = Math.Max(autoDelete,1);this.lockName = lockName;this.lockValue = lockName;}/// <summary>/// 获取锁/// </summary>/// <param name="timeout">超时为null,则尝试一次即返回</param>/// <returns>获取锁成功?</returns>internal bool Acquire(TimeSpan? timeout = null){bool bLock = false;var dtStart = DateTime.Now.Ticks;while (!bLock){bLock = TryAcquireOnce();if (timeout == null){break;}if (!bLock){Thread.Sleep(this.checkTimeSpan);}var ts = new TimeSpan(DateTime.Now.Ticks - dtStart);if (ts >= timeout){break;}}return bLock;}//此处采用框架上下文管理分布式事务锁的释放,代码略。//public void Dispose()//{// LockManager.ReleaseLock(this);//}internal void Release(){try{var bRtn = ResourceCache.Instance.LockRelease(this.lockName, this.lockValue);Trace.WriteLine($"释放锁 {this.lockName}:{bRtn}");}catch (Exception e){LogTextWriter.Write($"释放锁失败,系统自动超时释放:{this.lockName}");}}/// <summary>/// 释放锁/// </summary>public void ReleaseLock(){LockManager.ReleaseLock(this);}private bool TryAcquireOnce(){try{Trace.WriteLine($"{Thread.CurrentThread.ManagedThreadId}:TryAcquireOnce");var @lock = ResourceCache.Instance.LockTake(this.lockName, this.lockValue, new TimeSpan(0, 0, 0, 0, this.autoDelete));return @lock;}catch (Exception e){return false;}}}
锁的使用
在当前上下文中获取一个分布式锁,第一个获取锁的将执行依赖当前key(一般为业务主键)的完整业务流程(包括多个微服务之间的调用和数据库的访问;
后来者将无法获取锁,根据返回的结果来判断是否进入流程,如果返回的锁为null将不能执行下面的流程,要么重试等待锁释放,要么返回错误.
锁的调用一般流程:
var qtLock=TryGetLock(lockKey);if(qtLock==null) { //提示不能同时执行操作return;}else {//进行业务流程}//最后别忘了qtLock.ReleaseLock();
API 内的范例:
code = StatusCode.OK;//传入超时时间,可以一直等待到超时过期var lockSaveReceipt = this.Context.TryGetLock($"{nameof(SaveReceipt)}.{valueArgs.ReceiptArgs.ReceiptId}");if (lockSaveReceipt == null){code = PublicErrorCode.SaveReceiptByUsed.ToCode();return null;}try{//todo 业务操作1//todo 业务操作2//...}finally{lockSaveReceipt.ReleaseLock();}