分布式锁的原理和实现(Go)

文章目录

    • 为什么需要分布式锁?
    • go语言分布式锁的实现
      • Redis
        • 自己的实现
        • 红锁是什么
        • 别人的带红锁的实现
      • etcd
      • zk的实现

为什么需要分布式锁?

保证分布式系统并发请求或不同服务实例操作共享资源的安全性,通过一种协调机制来保证在同一时刻只有一个节点能够访问共享资源的工具,主要用于解决分布式环境中的数据一致性和并发控制问题。 应用场景:用户下单,库存扣减,余额扣减。我们的场景:防止用户信息多写,防止重复发送邮件。

  • 使用分布式锁可能会对性能产生一定的影响,但这是为了确保数据的一致性和正确性所必需的;如果操作是操作是幂等的(即使多次执行也会产生相同的结果),可能不需要分布式锁

在这里插入图片描述

go语言分布式锁的实现

Redis

https://github.com/zeromicro/go-zero go-zero里已经实现了redislock,但没有续约机制

自己的实现
// 需要实现的能力
// 1.排他性、原子性
// 2.主动释放/自动释放
// 3.可重入
// 4.可续约package r_lcimport ("context""errors""github.com/go-redis/redis/v8""time""github.com/google/uuid"
)// RLc 基于redis的分布式锁
type RLc struct {rdb *redis.Client// key 锁标识key string// lcTag 唯一标识,防止串锁lcTag string// expiresIn 过期时间expiresIn time.Duration// releaseCh 锁释放信号 (看门狗)releaseCh chan struct{}// RetryInterval LockWait重试锁的间隔。默认100msRetryInterval time.Duration// RenewInterval 续约锁间隔,默认为expiresIn/2RenewInterval time.Duration// MaxRenewDur 自动续约最长时间。默认1小时,当expiresIn大于1小时,为expiresInMaxRenewDur time.Duration
}type RlcOpt func(lc *RLc)const (retryIntervalDefault = 100 * time.MillisecondmaxRenewDurDefault   = time.Hour
)// LUA脚本
var (// tryLockLua// return 0. 加锁失败// return >0. 加锁成功,当前锁的数量tryLockLua = `
local key = KEYS[1]
local val = ARGV[1]
local expiresIn = ARGV[2]-- 锁不存在,加锁
if redis.call('EXISTS', key) == 0 thenredis.call('HINCRBY', key, val, 1)redis.call('PEXPIRE', key, expiresIn)return 1
end-- 锁存在,判断持有锁,增加加锁次数 (可重入)
if redis.call('HEXISTS', key, val) == 1 thenreturn redis.call('HINCRBY', key, val, 1)
end-- 锁被其他进程占用
return 0`// unlockLua// return > 0. 剩余待解锁次数// return = 0. 解锁成功// return = -1. 锁不存在 | 未持有锁unlockLua = `
local key = KEYS[1]
local val = ARGV[1]-- 锁不存在或未持有锁
if redis.call('HEXISTS', key, val) == 0 thenreturn -1
end-- 按次数解锁
local count = redis.call('HINCRBY', key, val, -1)
if count <= 0 then-- 全部解锁redis.call("DEL",key)return 0
end-- 剩余待解锁次数
return count
`// renewLua// return 0. 续约失败// return 1. 续约成功renewLua = `
local key = KEYS[1]
local val = ARGV[1]
local expiresIn = ARGV[2]-- 锁不存在或未持有锁
if redis.call('HEXISTS', key, val) == 0 thenreturn 0
end-- 设置过期时间
return redis.call('PEXPIRE', key, expiresIn)
`
)var (ErrLostKey = errors.New("lost key") // 锁不存在或被其他进程占用
)func NewRLc(rdb *redis.Client, key string, expiresIn time.Duration, opts ...RlcOpt) *RLc {lc := &RLc{rdb:       rdb,key:       key,lcTag:     uuid.New().String(),expiresIn: expiresIn,releaseCh: make(chan struct{}),}for _, opt := range opts {opt(lc)}if lc.RetryInterval == 0 {lc.RetryInterval = retryIntervalDefault}if lc.RenewInterval == 0 {lc.RenewInterval = lc.expiresIn / 2}if lc.MaxRenewDur == 0 {lc.MaxRenewDur = maxRenewDurDefaultif lc.MaxRenewDur < lc.expiresIn {lc.MaxRenewDur = lc.expiresIn}}return lc
}// TryLock 尝试锁
func (lc *RLc) TryLock(ctx context.Context) (lcNum int, res bool) {lua := redis.NewScript(tryLockLua)lcNum, _ = lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag, lc.expiresIn.Milliseconds()).Int()if lcNum == 0 {return 0, false}return lcNum, true
}// LockWait 尝试锁并等待
func (lc *RLc) LockWait(ctx context.Context, wait time.Duration) (lcNum int, res bool) {ctx, cancel := context.WithTimeout(ctx, wait)defer cancel()jumpEnd:for {select {case <-ctx.Done():break jumpEndcase <-lc.releaseCh:break jumpEnddefault:lcNum, res = lc.TryLock(ctx)if res {return}time.Sleep(lc.RetryInterval)}}return 0, false
}// Unlock 解锁
func (lc *RLc) Unlock(ctx context.Context) (leftLcNum int, err error) {lua := redis.NewScript(unlockLua)leftLcNum, err = lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag).Int()if err != nil {return 0, err}if leftLcNum < 0 {return 0, ErrLostKey}if leftLcNum == 0 {close(lc.releaseCh)}return
}// Renew 续约
// expiresIn=0时,会使用初始化时设定的expiresIn
func (lc *RLc) Renew(ctx context.Context) {// 限制续约最大持续时间,减少协程泄露影响ctx, cancel := context.WithTimeout(ctx, lc.MaxRenewDur)// 续约go func(lc *RLc) {for {select {case <-ctx.Done():returncase <-lc.releaseCh:cancel()returndefault:lua := redis.NewScript(renewLua)res, _ := lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag, lc.expiresIn.Milliseconds()).Int()if res == 0 {cancel()}time.Sleep(lc.RenewInterval)}}}(lc)return
}

单元测试

package r_lcimport ("context""fmt""github.com/go-redis/redis/v8""runtime""testing""time"
)func getRdb() (rdb *redis.Client, err error) {rdb = redis.NewClient(&redis.Options{Addr:     "127.0.0.1:6379",Password: "",DB:       1,})_, err = rdb.Ping(context.TODO()).Result()if err != nil {return}return
}func TestLockWait(t *testing.T) {rdb, err := getRdb()if err != nil {t.Fatal(err)}t.Run("lock1", func(t1 *testing.T) {go func() {lockWaitFunc(t1, rdb, 10*time.Second)}()})time.Sleep(10 * time.Millisecond)t.Run("lock2", func(t2 *testing.T) {go func() {lockWaitFunc(t2, rdb, 5*time.Second)}()})fmt.Println("0s NumGoroutine", runtime.NumGoroutine())time.Sleep(3 * time.Second)fmt.Println("3s NumGoroutine", runtime.NumGoroutine()) // 续约协程启动time.Sleep(2 * time.Second)fmt.Println("5s NumGoroutine", runtime.NumGoroutine()) // lock2 续约协程释放time.Sleep(5 * time.Second)fmt.Println("10s NumGoroutine", runtime.NumGoroutine()) // lock1 续约协程保持time.Sleep(5 * time.Second)fmt.Println("15s NumGoroutine", runtime.NumGoroutine()) // lock1 续约协程释放time.Sleep(2 * time.Second)}func TestRetry(t *testing.T) {rdb, err := getRdb()if err != nil {t.Fatal(err)}ctx := context.Background()// 初始化锁信息lc := NewRLc(rdb, "test-lock", 5*time.Second, func(lc *RLc) {lc.MaxRenewDur = time.Second * 10lc.RenewInterval = time.Second * 5lc.RetryInterval = 10 * time.Millisecond})lc2 := NewRLc(rdb, "test-lock", 5*time.Second, func(lc *RLc) {lc.MaxRenewDur = time.Second * 10lc.RenewInterval = time.Second * 5lc.RetryInterval = 10 * time.Millisecond})// 启动续约lc.Renew(ctx)fmt.Println(lc.TryLock(ctx))fmt.Println(lc.TryLock(ctx))fmt.Println(lc.TryLock(ctx))time.Sleep(5 * time.Second)fmt.Println("wwwww")fmt.Println(lc2.LockWait(ctx, 10*time.Second))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))return
}func lockWaitFunc(t *testing.T, rdb *redis.Client, wait time.Duration) {ctx := context.Background()// 初始化锁信息lc := NewRLc(rdb, "test-lock", 15*time.Second)// 阻塞式获取锁_, getLock := lc.LockWait(ctx, wait)if getLock == false {fmt.Println("获取锁超时")t.Log("获取锁超时")return}defer lc.Unlock(ctx)// 启动续约lc.Renew(ctx)// 处理业务代码for i := 0; i < 10; i++ {time.Sleep(time.Second)}return
}

使用分布式锁

	// 加分布式锁contxt := ctx.GetSpanCtx()lc := r_lc.NewRLc(server.GetRedisClient(), config.GetDistributedLockKey(fmt.Sprintf("user_info:%s", wsId)), 30*time.Second)_, lcRes := lc.LockWait(contxt, 30*time.Second)if lcRes == false {err = my_err.WrapF(err, Err.ErrCodeSysRequestTimeout, "user_info lc.LockWait wsId:%s", wsId)return}defer lc.Unlock(contxt)lc.Renew(contxt)

测试结果,可以使用压测工具

在这里插入图片描述

红锁是什么

红锁算法(Redlock)是一种分布式锁的实现算法,由 Redis 的作者 Antirez 发布。它主要用于解决分布式环境下的资源争用问题,同时保证锁的可靠性和安全性。红锁算法通过在多个 Redis 节点上创建锁,要求获得锁的客户端必须在大多数节点上成功创建锁,从而确保在分布式环境中只有一个客户端可以获得锁。

红锁算法的基本步骤如下:

  1. 客户端获取当前系统时间。
  2. 客户端尝试在 N 个 Redis 节点上创建锁,设置锁的过期时间为过期时间加上一个小的时延。
  3. 如果客户端在大多数节点上成功创建了锁(N/2+1),则认为客户端获得了锁。客户端应将锁的有效期设置为从步骤1开始计算的实际过期时间。
  4. 如果客户端未能在大多数节点上创建锁,那么客户端需要删除在其他节点上创建的锁,并等待一段随机时间后重新尝试。
别人的带红锁的实现

https://juejin.cn/post/7148391514966589477

etcd

todo 待研究

库:https://github.com/etcd-io/etcd

https://juejin.cn/post/7148391514966589477

https://www.liwenzhou.com/posts/Go/etcd/

zk的实现

todo 待研究

库:https://github.com/samuel/go-zookeeper

zookeeper简称zk,zk是通过生成临时有序节点来实现分布式锁的,首先会在/lock目录下一个临时有序节点,后续请求会在节点后面继续创建临时节点。新的子节点后面,会添加一个次序编号,这个生成的编号,会在上一次的编号进行 +1 操作。

zk节点监听机制:每个线程抢占锁之前,先尝试创建自己的ZNode。同样,释放锁的时候,就需要删除创建的Znode。创建成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode的通知就可以了。前一个Znode删除的时候,会触发Znode事件,当前节点能监听到删除事件,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个依次向后。

zk临时节点自动删除:当我们客户端断开连接之后,我们出创建的临时节点会进行自动删除操作,所以我们在使用分布式锁的时候,一般都是会去创建临时节点,这样可以避免因为网络异常等原因,造成的死锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/17133.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式17——模板方法模式

写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用&#xff0c;主要是下面的UML图可以起到大作用&#xff0c;在你学习过一遍以后可能会遗忘&#xff0c;忘记了不要紧&#xff0c;只要看一眼UML图就能想起来了。同时也请大家多多指教。 模板方法模式&#xff08;Temp…

阿里云Linux 3.2104 LTS 64位安装SVN服务器

直接按步骤 yum install subversion 写y就行 主要是看看安装了那些文件 rpm -ql subversion 主要是为了创建版本库而准备&#xff0c;这个能一遍创建就一遍创建&#xff0c;不行就逐个创建。能创就忽略下面两个mkdir步骤。 mkdir /home/svn/groupRepos 根据新建目录作为版本…

LeetCode第131场双周赛C++题解

3158.求出出现两次数字的XOR值 给你一个数组 nums &#xff0c;数组中的数字 要么 出现一次&#xff0c;要么 出现两次。 请你返回数组中所有出现两次数字的按位 XOR 值&#xff0c;如果没有数字出现过两次&#xff0c;返回 0 。 示例 1&#xff1a; 输入&#xff1a;nums …

程序中的网络地址等敏感信息,从网络安全的角度,应该怎么配置

从网络安全的角度来看&#xff0c;配置IP信息需要谨慎处理&#xff0c;以防止敏感信息泄露和系统受到攻击。以下是一些建议和最佳实践&#xff1a; 1. 使用环境变量或配置管理工具 环境变量 将IP地址等敏感信息存储在环境变量中&#xff0c;而不是硬编码在代码里。这有助于确…

业务实战————Uibot6.0 .1多页面商品信息抓取RPA机器人

前言 【案例描述】 鲜果记水果店计划在淘宝电商平台上开设一家新店&#xff0c;小微是该企业运营部分的运营专员&#xff0c;主要负责公司商品上架和管理的工作。 公司计划在开店的新品促销活动中增加水果品类红富士苹果。小微需在商品上架前了解目前平台中销量前列的红富士苹…

C#面:死锁的必要条件是什么?怎么克服?

C#中的死锁是指两个或多个线程互相等待对方释放资源&#xff0c;导致程序无法继续执行的情况。 死锁的必要条件&#xff1a; 互斥条件&#xff1a;至少有一个资源被设置为只能被一个线程占用。请求与保持条件&#xff1a;一个线程在持有资源的同时又请求其他线程占有的资源。…

SpringBootTest测试框架一

为了方便开发,对数据进行mock处理,形成文件,只修改文件内容达到mock指定数据的目的 1、定义测试模式 @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface TestModel {TestModelEnum value() default TestModelEnum.LOCAL_PRIOR;String…

数字水印 | 离散余弦变换 DCT 基本原理及 Python 代码实现

目录 1 基本原理2 代码实现3 图像压缩 1 基本原理 参考博客&#xff1a;https://www.cnblogs.com/zxporz/p/16072580.html D C T \mathsf{DCT} DCT 全称为 D i s c r e t e C o s i n e T r a n s f o r m \mathsf{Discrete\ Cosine\ Transform} Discrete Cosine Transfo…

新购入的读码器该如何测试呢?

物联网技术的飞速发展&#xff0c;条码二维码作为一种高效、便捷的数据传输方式&#xff0c;已经广泛应用于仓储、物流配送、零售与结算、MES系统等生活和工业领域。新购的条码二维码读码器&#xff0c;在使用前要了解它的使用方法和性能&#xff0c;以确保其性能稳定、读取准确…

小预算大效果:揭秘品牌如何用创新方法实现低成本传播

说到品牌&#xff0c;我们都知道&#xff0c;没钱是真的难搞。 品牌建设就像跑马拉松&#xff0c;得慢慢来&#xff0c;持续投入&#xff0c;一点一滴积累声誉&#xff0c;这样才能培养出忠实的粉丝团。 但别急&#xff0c;就算资金紧张&#xff0c;我们也有办法让品牌慢慢站…

基于飞书机器人跨账号消息提醒

事情的起因是飞书中不同的账号不能同时登录&#xff0c;虽然可以在飞书的账号切换页面看到其他账号下是否有消息提醒&#xff08;小红点&#xff09;&#xff0c;但是无法实现提醒功能&#xff0c;很不优雅&#xff0c;因此本文尝试提出一种新的方式实现不同账号之间的提醒功能…

自定义CSS属性(@property)解决自定义CSS变量无法实现过渡效果的问题

且看下面的代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>demot</title&g…

云原生架构产生背景

1.云原生的概念 “云原生”来自于Cloud Native的直译&#xff0c;拆开来看&#xff0c;Cloud就是指其应用软件是在云端而非传统的数据中心。Naive代表应用软件从一开始就是基于云环境、专门为云端特性而设计&#xff0c;可充分利用和发挥云平台的弹性分布式优势&#xff0c;最…

内存泄漏案例分享2-Fragment的内存泄漏

案例2——hprof文件显示出Fragment内存泄漏 接下来我们来看fragment内存泄漏&#xff0c;老规矩查看fields和references&#xff0c;确保它符合内存泄漏的情形&#xff1b;我们点击jump to source查看泄漏的位置 Fragment#MZBannerView#内部类Runnbale /*** Banner 切换时间间…

【Unity AR开发插件】如何快速地开发可热更的AR应用

预告 本专栏将介绍如何使用这个支持热更的AR开发插件&#xff0c;快速地开发AR应用。 Unity AR开发插件使用教程 更新 二、使用插件一键安装HybridCLR和ARCore 三、配置带HybridCLR的ARCore开发环境 四、制作热更数据-AR图片识别场景

StringBufferInputStream类,你学会了吗?

在Java编程中,处理字符串数据流是一项常见的任务。 为了更灵活地处理字符串数据流,Java提供了StringBufferInputStream类,它允许将字符串转换为输入流,从而可以像处理其他输入流一样对字符串进行操作。 本文将深入探讨StringBufferInputStream类的背景、用法、优缺点以及…

RocketMQ如何保证消息不丢失

同步发送消息&#xff0c;那为什么还会有异步&#xff0c;因为异步发送效率高。 存储消息 通过配置持久化策略。 存储消息的过程&#xff0c;先存到内存page cache&#xff0c;再持久化到磁盘&#xff0c;默认配置是直接到内存后就返回成功了,但是如果这时候机器断电了,就会丢…

父进程等待子进程退出

一、 为什么要等待子进程退出&#xff1f; 等待子进程退出是为了确保父进程能够在子进程执行完毕后继续执行或者处理子进程的结果。在许多情况下&#xff0c;父进程需要等待子进程完成后才能继续执行&#xff0c;以确保正确的执行顺序和结果。 以下是一些等待子进程退出的主要…

2024年,游戏行业还值得进入吗?

来自知乎问题“2024年&#xff0c;游戏行业还值得进入吗&#xff1f;”的回答。 ——原问题描述&#xff1a;从超小厂执行策划做起&#xff0c;未来有前途吗&#xff1f; 展望2024年&#xff0c;国内外的游戏市场环境或将变得更加复杂&#xff0c;曾经那个水大鱼大的时代过去了…

工智能在脉搏分析中的matlab应用

人工智能&#xff08;AI&#xff09;在脉搏分析中的应用可以通过多种方法实现&#xff0c;包括使用机器学习算法对脉搏信号进行分类、预测或异常检测。MATLAB是一个强大的平台&#xff0c;它提供了各种工具和功能来处理和分析这样的信号。以下是一个简化的步骤&#xff0c;说明…