文章目录
- 为什么需要分布式锁?
- go语言分布式锁的实现
- Redis
- 自己的实现
- 红锁是什么
- 别人的带红锁的实现
- etcd
- zk的实现
为什么需要分布式锁?
保证分布式系统并发请求或不同服务实例操作共享资源的安全性,通过一种协调机制来保证在同一时刻只有一个节点能够访问共享资源的工具,主要用于解决分布式环境中的数据一致性和并发控制问题。 应用场景:用户下单,库存扣减,余额扣减。我们的场景:防止用户信息多写,防止重复发送邮件。
- 使用分布式锁可能会对性能产生一定的影响,但这是为了确保数据的一致性和正确性所必需的;如果操作是操作是幂等的(即使多次执行也会产生相同的结果),可能不需要分布式锁
go语言分布式锁的实现
Redis
https://github.com/zeromicro/go-zero go-zero里已经实现了redislock,但没有续约机制
自己的实现
// 需要实现的能力
// 1.排他性、原子性
// 2.主动释放/自动释放
// 3.可重入
// 4.可续约package r_lcimport ("context""errors""github.com/go-redis/redis/v8""time""github.com/google/uuid"
)// RLc 基于redis的分布式锁
type RLc struct {rdb *redis.Client// key 锁标识key string// lcTag 唯一标识,防止串锁lcTag string// expiresIn 过期时间expiresIn time.Duration// releaseCh 锁释放信号 (看门狗)releaseCh chan struct{}// RetryInterval LockWait重试锁的间隔。默认100msRetryInterval time.Duration// RenewInterval 续约锁间隔,默认为expiresIn/2RenewInterval time.Duration// MaxRenewDur 自动续约最长时间。默认1小时,当expiresIn大于1小时,为expiresInMaxRenewDur time.Duration
}type RlcOpt func(lc *RLc)const (retryIntervalDefault = 100 * time.MillisecondmaxRenewDurDefault = time.Hour
)// LUA脚本
var (// tryLockLua// return 0. 加锁失败// return >0. 加锁成功,当前锁的数量tryLockLua = `
local key = KEYS[1]
local val = ARGV[1]
local expiresIn = ARGV[2]-- 锁不存在,加锁
if redis.call('EXISTS', key) == 0 thenredis.call('HINCRBY', key, val, 1)redis.call('PEXPIRE', key, expiresIn)return 1
end-- 锁存在,判断持有锁,增加加锁次数 (可重入)
if redis.call('HEXISTS', key, val) == 1 thenreturn redis.call('HINCRBY', key, val, 1)
end-- 锁被其他进程占用
return 0`// unlockLua// return > 0. 剩余待解锁次数// return = 0. 解锁成功// return = -1. 锁不存在 | 未持有锁unlockLua = `
local key = KEYS[1]
local val = ARGV[1]-- 锁不存在或未持有锁
if redis.call('HEXISTS', key, val) == 0 thenreturn -1
end-- 按次数解锁
local count = redis.call('HINCRBY', key, val, -1)
if count <= 0 then-- 全部解锁redis.call("DEL",key)return 0
end-- 剩余待解锁次数
return count
`// renewLua// return 0. 续约失败// return 1. 续约成功renewLua = `
local key = KEYS[1]
local val = ARGV[1]
local expiresIn = ARGV[2]-- 锁不存在或未持有锁
if redis.call('HEXISTS', key, val) == 0 thenreturn 0
end-- 设置过期时间
return redis.call('PEXPIRE', key, expiresIn)
`
)var (ErrLostKey = errors.New("lost key") // 锁不存在或被其他进程占用
)func NewRLc(rdb *redis.Client, key string, expiresIn time.Duration, opts ...RlcOpt) *RLc {lc := &RLc{rdb: rdb,key: key,lcTag: uuid.New().String(),expiresIn: expiresIn,releaseCh: make(chan struct{}),}for _, opt := range opts {opt(lc)}if lc.RetryInterval == 0 {lc.RetryInterval = retryIntervalDefault}if lc.RenewInterval == 0 {lc.RenewInterval = lc.expiresIn / 2}if lc.MaxRenewDur == 0 {lc.MaxRenewDur = maxRenewDurDefaultif lc.MaxRenewDur < lc.expiresIn {lc.MaxRenewDur = lc.expiresIn}}return lc
}// TryLock 尝试锁
func (lc *RLc) TryLock(ctx context.Context) (lcNum int, res bool) {lua := redis.NewScript(tryLockLua)lcNum, _ = lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag, lc.expiresIn.Milliseconds()).Int()if lcNum == 0 {return 0, false}return lcNum, true
}// LockWait 尝试锁并等待
func (lc *RLc) LockWait(ctx context.Context, wait time.Duration) (lcNum int, res bool) {ctx, cancel := context.WithTimeout(ctx, wait)defer cancel()jumpEnd:for {select {case <-ctx.Done():break jumpEndcase <-lc.releaseCh:break jumpEnddefault:lcNum, res = lc.TryLock(ctx)if res {return}time.Sleep(lc.RetryInterval)}}return 0, false
}// Unlock 解锁
func (lc *RLc) Unlock(ctx context.Context) (leftLcNum int, err error) {lua := redis.NewScript(unlockLua)leftLcNum, err = lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag).Int()if err != nil {return 0, err}if leftLcNum < 0 {return 0, ErrLostKey}if leftLcNum == 0 {close(lc.releaseCh)}return
}// Renew 续约
// expiresIn=0时,会使用初始化时设定的expiresIn
func (lc *RLc) Renew(ctx context.Context) {// 限制续约最大持续时间,减少协程泄露影响ctx, cancel := context.WithTimeout(ctx, lc.MaxRenewDur)// 续约go func(lc *RLc) {for {select {case <-ctx.Done():returncase <-lc.releaseCh:cancel()returndefault:lua := redis.NewScript(renewLua)res, _ := lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag, lc.expiresIn.Milliseconds()).Int()if res == 0 {cancel()}time.Sleep(lc.RenewInterval)}}}(lc)return
}
单元测试
package r_lcimport ("context""fmt""github.com/go-redis/redis/v8""runtime""testing""time"
)func getRdb() (rdb *redis.Client, err error) {rdb = redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379",Password: "",DB: 1,})_, err = rdb.Ping(context.TODO()).Result()if err != nil {return}return
}func TestLockWait(t *testing.T) {rdb, err := getRdb()if err != nil {t.Fatal(err)}t.Run("lock1", func(t1 *testing.T) {go func() {lockWaitFunc(t1, rdb, 10*time.Second)}()})time.Sleep(10 * time.Millisecond)t.Run("lock2", func(t2 *testing.T) {go func() {lockWaitFunc(t2, rdb, 5*time.Second)}()})fmt.Println("0s NumGoroutine", runtime.NumGoroutine())time.Sleep(3 * time.Second)fmt.Println("3s NumGoroutine", runtime.NumGoroutine()) // 续约协程启动time.Sleep(2 * time.Second)fmt.Println("5s NumGoroutine", runtime.NumGoroutine()) // lock2 续约协程释放time.Sleep(5 * time.Second)fmt.Println("10s NumGoroutine", runtime.NumGoroutine()) // lock1 续约协程保持time.Sleep(5 * time.Second)fmt.Println("15s NumGoroutine", runtime.NumGoroutine()) // lock1 续约协程释放time.Sleep(2 * time.Second)}func TestRetry(t *testing.T) {rdb, err := getRdb()if err != nil {t.Fatal(err)}ctx := context.Background()// 初始化锁信息lc := NewRLc(rdb, "test-lock", 5*time.Second, func(lc *RLc) {lc.MaxRenewDur = time.Second * 10lc.RenewInterval = time.Second * 5lc.RetryInterval = 10 * time.Millisecond})lc2 := NewRLc(rdb, "test-lock", 5*time.Second, func(lc *RLc) {lc.MaxRenewDur = time.Second * 10lc.RenewInterval = time.Second * 5lc.RetryInterval = 10 * time.Millisecond})// 启动续约lc.Renew(ctx)fmt.Println(lc.TryLock(ctx))fmt.Println(lc.TryLock(ctx))fmt.Println(lc.TryLock(ctx))time.Sleep(5 * time.Second)fmt.Println("wwwww")fmt.Println(lc2.LockWait(ctx, 10*time.Second))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))fmt.Println(lc.Unlock(ctx))return
}func lockWaitFunc(t *testing.T, rdb *redis.Client, wait time.Duration) {ctx := context.Background()// 初始化锁信息lc := NewRLc(rdb, "test-lock", 15*time.Second)// 阻塞式获取锁_, getLock := lc.LockWait(ctx, wait)if getLock == false {fmt.Println("获取锁超时")t.Log("获取锁超时")return}defer lc.Unlock(ctx)// 启动续约lc.Renew(ctx)// 处理业务代码for i := 0; i < 10; i++ {time.Sleep(time.Second)}return
}
使用分布式锁
// 加分布式锁contxt := ctx.GetSpanCtx()lc := r_lc.NewRLc(server.GetRedisClient(), config.GetDistributedLockKey(fmt.Sprintf("user_info:%s", wsId)), 30*time.Second)_, lcRes := lc.LockWait(contxt, 30*time.Second)if lcRes == false {err = my_err.WrapF(err, Err.ErrCodeSysRequestTimeout, "user_info lc.LockWait wsId:%s", wsId)return}defer lc.Unlock(contxt)lc.Renew(contxt)
测试结果,可以使用压测工具
红锁是什么
红锁算法(Redlock)是一种分布式锁的实现算法,由 Redis 的作者 Antirez 发布。它主要用于解决分布式环境下的资源争用问题,同时保证锁的可靠性和安全性。红锁算法通过在多个 Redis 节点上创建锁,要求获得锁的客户端必须在大多数节点上成功创建锁,从而确保在分布式环境中只有一个客户端可以获得锁。
红锁算法的基本步骤如下:
- 客户端获取当前系统时间。
- 客户端尝试在 N 个 Redis 节点上创建锁,设置锁的过期时间为过期时间加上一个小的时延。
- 如果客户端在大多数节点上成功创建了锁(N/2+1),则认为客户端获得了锁。客户端应将锁的有效期设置为从步骤1开始计算的实际过期时间。
- 如果客户端未能在大多数节点上创建锁,那么客户端需要删除在其他节点上创建的锁,并等待一段随机时间后重新尝试。
别人的带红锁的实现
https://juejin.cn/post/7148391514966589477
etcd
todo 待研究
库:https://github.com/etcd-io/etcd
https://juejin.cn/post/7148391514966589477
https://www.liwenzhou.com/posts/Go/etcd/
zk的实现
todo 待研究
库:https://github.com/samuel/go-zookeeper
zookeeper简称zk,zk是通过生成临时有序节点来实现分布式锁的,首先会在/lock目录下一个临时有序节点,后续请求会在节点后面继续创建临时节点。新的子节点后面,会添加一个次序编号,这个生成的编号,会在上一次的编号进行 +1 操作。
zk节点监听机制:每个线程抢占锁之前,先尝试创建自己的ZNode。同样,释放锁的时候,就需要删除创建的Znode。创建成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode的通知就可以了。前一个Znode删除的时候,会触发Znode事件,当前节点能监听到删除事件,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个依次向后。
zk临时节点自动删除:当我们客户端断开连接之后,我们出创建的临时节点会进行自动删除操作,所以我们在使用分布式锁的时候,一般都是会去创建临时节点,这样可以避免因为网络异常等原因,造成的死锁。