概述
- 我们的 redis 一般都是集群来给我们程序提供服务的,单体的redis现在也不多见
- 看到上面是主节点redis和下面是6个重节点redis,主节点和重节点的通讯都是畅通没问题的
- 这个时候,我们有 gorouting 写我们的数据,那它就会用到我们的setNX
- 写完数据内部是自动同步的,就是你的这个数据通过主节点同步到这些从节点了
- 下面又有我们的 gorouting 去读我们的从节点,但是,我们是在高并发和网络不确定的情况下可能会遇到一些问题
可能会遇到的一些问题
1 )集群方面
- 如果,我们上面是一个gorouting,在主节点上,它用setNX写数据,如果主节点挂了
- 集群就从我们的所有子节点中抽取一个节点,当成主节点顶上去,集群又可以正常工作了
- 这个时候,有一个gorouting在右下角,它又来读数据了,由于我们上面刚写了数据
- 还没有来得及同步到最后一个 redis 这个节点上, 但是面临着新的gorouting读取数据或操作
- 这个时候最后这一台redis它是拿不到那个锁的,是没有同步到的
- 最后来的 gorouting,就认为你没有锁, 或者说我要的资源,你没锁住
- 那其他 gorouting 就认为它是无主的, 就可以锁, 这个时候就会造成一些问题
2 )网络方面
- 正常的时候,写数据还有下面的从节点去获取数据,获取锁,都是没问题的
- 如果是网络抖动或不通会有一些问题,由于redis它是网络传输的
- 如果说我们右边的这网络络不通了,相当于右边的 redis 没有和主节点通讯
- 这个时候我们的一个gorouting就来获取锁进行数据的操作
- 如果这个时候,我要操作的资源没有上锁,那这个gorouting就认为它是还没有被加锁,就把这个锁锁上了
- 所以这个地方也是有可能出问题的风险
解决方案
1 )使用 redLock
- 锁不住资源,有可能因为节点挂了或网络抖动, 我们现在尝试使用 redLock 来解决这一个问题
- redLock它没有master节点,也没有这个slave从节点,都是独立的
- 每一个redis,都是有一个 SetNx 这么一个锁, 现在有两个协程来申请锁
- redis集群的一般是7个,而不是说双数的, 如果双数的那我左边的 gorouting 获得3个
- 右边的 gorouting 获得3个,他就要重新再做选举投票之类的东西
- 基于redLock, 当左边的 gorouting 抢到了4个,那右边的只有3个就应该释放掉
- 为下一次再运行做准备,右边这个锁就消失了
2 ) 源码
- redlock把原来的master,slave这种模式改成了平等的模式,最终解决了问题
2.1 ) NewMutex
- 在源码的
NewMutex
函数中// NewMutex returns a new distributed mutex with given name. func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {m := &Mutex{name: name,expiry: 8 * time.Second,tries: 32, // 重试 32 次delayFunc: func(tries int) time.Duration {return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond},genValueFunc: genValue,driftFactor: 0.01, // 漂移因子timeoutFactor: 0.05, // 超时因子quorum: len(r.pools)/2 + 1, // 法定数,找一半+1,大多数pools: r.pools,}for _, o := range options {o.Apply(m)}if m.shuffle {randomPools(m.pools)}return m }
- 上面
driftFactor
是说我们的服务器的时钟漂移- 比如说我们的A服务器是中午12点,但是B服务器是中午11点59分30秒
- C服务器是中午的12点0分30秒,相当于它们每台服务器相差30秒
- 这就是服务器的时间漂移,它不准,那这会导致什么呢?
- 假如说我们的这个过期时间是8秒,那你差了30秒,肯定就是有的服务器会先释放锁
- 那先释放锁,其他人就可以拿到锁,所以他就设置了一个因子
- 关于
quorum
就如同上面的例子,7台服务器拿到了4台就是成功的
2.2 ) Lock
- 回到锁定的函数
Lock
中,进入其lockContext
// lockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again. func (m *Mutex) lockContext(ctx context.Context, tries int) error {if ctx == nil {ctx = context.Background()}// 获取 base64 的值value, err := m.genValueFunc()if err != nil {return err}var timer *time.Timer// 对默认32次循环的操作for i := 0; i < tries; i++ {if i != 0 {if timer == nil {timer = time.NewTimer(m.delayFunc(i))} else {timer.Reset(m.delayFunc(i))}select {// 如果 完成 状态,则返回 ErrFailedcase <-ctx.Done():timer.Stop()// Exit early if the context is done.return ErrFailed// 没有完成,则不动case <-timer.C:// Fall-through when the delay timer completes.}}start := time.Now()// 拿到计数器和错误信息n, err := func() (int, error) {ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))defer cancel()/// 注意这里,最终的函数就是执行的这里return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {return m.acquire(ctx, pool, value)})}()now := time.Now()// 下面是核心算法: 过期时间 - 拿锁的时间 - 漂移因子可能的时间until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))// 判断是否是大多数并且没有过期,则直接进行赋值if n >= m.quorum && now.Before(until) {m.value = valuem.until = untilreturn nil}// 否则 release 进行释放_, _ = func() (int, error) {ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))defer cancel()return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {return m.release(ctx, pool, value)})}()if i == tries-1 && err != nil {return err}}return ErrFailed }
- 进入
actOnPoolsAsync
这里参数是一个函数func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {type result struct {node intstatusOK boolerr error}// 创建 channelch := make(chan result, len(m.pools))// 循环 poolsfor node, pool := range m.pools {// 开协程提速go func(node int, pool redis.Pool) {r := result{node: node}r.statusOK, r.err = actFn(pool)ch <- r}(node, pool)}var (n = 0 // 计数器taken []interr error // 错误)// 循环 poolsfor range m.pools {// 从 channel 中拿到结果r := <-chif r.statusOK {n++ // 计数器加加} else if r.err == ErrLockAlreadyExpired {err = multierror.Append(err, ErrLockAlreadyExpired)} else if r.err != nil {err = multierror.Append(err, &RedisError{Node: r.node, Err: r.err})} else {taken = append(taken, r.node)err = multierror.Append(err, &ErrNodeTaken{Node: r.node})}if m.failFast {// fast retrunif n >= m.quorum {return n, err}// fail fastif len(taken) >= m.quorum {return n, &ErrTaken{Nodes: taken}}}}if len(taken) >= m.quorum {return n, &ErrTaken{Nodes: taken}}return n, err }
- 以上就是 redLock 源码锁的机制,通过源代码可以更好的理解框架
- 即使上面一些细节点看不懂,跳过即可,前期可以先看大的实现脉络帮助我们理解