概述
在并发编程中,我们经常会遇到多个线程或协程访问共享资源的情况。为了保护这些资源不被同时修改,我们会用到"锁"的概念。
Go中提供了读写锁:sync.RWMutex。
sync.RWMutex是Go语言提供的一个基础同步原语,它是Reader/Writer Mutual Exclusion Lock的缩写,通常被称为"读写锁"。
读写锁允许多个读锁同时拥有者,但在任何时间点只允许一个写锁拥有者,或者没有锁拥有者。
这让读多写少的场景获得了更高的并发性能。
应用场景
- 典型应用场景就是读多写少
- 一写多读
提供的方法
sync.RWMutex提供了以下方法:
type RWMutex
// 获取写锁,有读锁或者写锁被其他goroutine使用则阻塞等待
func (rw *RWMutex) Lock()
// 尝试获取写锁,获取到则返回true,没有获取到则为false
func (rw *RWMutex) TryLock() bool
// 释放写锁
func (rw *RWMutex) Unlock()
// 获取读锁,
func (rw *RWMutex) RLock()
// 尝试获取读锁,获取到则返回true,没有获取到则为false
func (rw *RWMutex) TryRLock() bool
// 释放读锁
func (rw *RWMutex) RUnlock()// 返回Locker
func (rw *RWMutex) RLocker() Locker
COPY
注意
使用RWMutex的时候,一旦调用了Lock方法,就不能再把该锁复制到其他地方使用,否则可能会出现各种问题。这是由于锁的状态(被哪个协程持有,是否已经被锁定等)是存储在RWMutex的结构体中,如果复制了RWMutex,那么复制后的RWMutex就会有一个全新的状态,锁的行为就会变得不可预测。
RWMutex和Mutex一样,一旦有了Lock调用就不能到处copy了,否则出现各种问题。
源码实现
RWMutex结构体
让我们一起深入Go的源码,看看RWMutex是如何实现的。
RWMutex 的结构体主要包括五个主要的字段,这些字段描述了锁的当前状态和持有者信息:
type RWMutex struct {// Mutex,互斥锁。写者互斥锁,所有的写者加锁都调用w.Lock或者w.TryLockw Mutex // 写者信号量。当最后一个读者释放了锁,会触发一个信号通知writerSemwriterSem uint32 // 读者信号量。当写者释放了锁,会触发一个信号通知readerSemreaderSem uint32 // readerCount 记录当前持有读锁的协程数量。如果为负数,表示有写者在等待所有读者释放锁。如果为0,表示没有任何协程持有锁readerCount atomic.Int32 // readerWait 记录写者需要等待的读者数量。当一个写者获取了锁之后,readerWait会设置为当前readerCount的值。当读者释放锁时,readerWait会递减1readerWait atomic.Int32
}
COPY
读者加锁RLock()
加读锁时非常简单,就是将结构体中的readerCount加1,如果+1后为负数表示有写者等待则等待写者执行完成。
实现代码
func (rw *RWMutex) RLock() {// 读者数量+1if rw.readerCount.Add(1) < 0 {// 加1以后如果readerCount是负数表示有写者持有了互斥锁// 读者等待信号量释放// 此时读锁已经加上了,等待写者释放信号量就可以了runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)}
}
COPY
读者RTryLock()
这个函数是RWMutex中的TryRLock方法,它试图以非阻塞的方式获取读锁。让我们一步一步地看它是如何工作的。
先看图:
实现代码
func (rw *RWMutex) TryRLock() bool {for {// 查看当前读者数量c := rw.readerCount.Load()if c < 0 {// 小于0表示有写者已经Penging,加锁失败return false}// 读者数量+1,加读锁成功if rw.readerCount.CompareAndSwap(c, c+1) {return true}}
}
COPY
读者释放读锁RUnlock()
RUnlock方法用于释放读锁。 当一个读者完成读操作并想要释放锁时,就可以调用这个方法。
实现代码
func (rw *RWMutex) RUnlock() {// 释放锁就是-1,// 如果readerCount小于0表示有写者Pending// 进入rUnlockSlowif r := rw.readerCount.Add(-1); r < 0 {rw.rUnlockSlow(r)}
}func (rw *RWMutex) rUnlockSlow(r int32) {// 边界问题处理// r+1 ==0 表示没有读者加锁,却调用了释放读锁// r+1 == -rwmutexMaxReaders表示没有读者加锁,有写者持有互斥锁却释放读锁if r+1 == 0 || r+1 == -rwmutexMaxReaders {race.Enable()fatal("sync: RUnlock of unlocked RWMutex")}// 这表示这是最后一个读者了,最后一个读者要发送信号量通知写者不用等了if rw.readerWait.Add(-1) == 0 {// The last reader unblocks the writer.runtime_Semrelease(&rw.writerSem, false, 1)}
}
COPY
写者加锁Lock()
实现代码
const rwmutexMaxReaders = 1 << 30func (rw *RWMutex) Lock() {// 先持有互斥锁,已经有其他写者持有了互斥锁则等待rw.w.Lock()// rw.readerCount.Add(-rwmutexMaxReaders)这个表示先将readerCount设置为负数表示有写者在等待// 再+rwmutexMaxReaders是为了求出当前reader的数量r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders// 将当前reader的数量加到readerWait表示要等待的读者完成的个数if r != 0 && rw.readerWait.Add(r) != 0 {// 阻塞等待万有的读者完成释放信号量了runtime_SemacquireRWMutex(&rw.writerSem, false, 0)}
}
COPY
写者加锁TryLock()
实现代码
func (rw *RWMutex) TryLock() bool {// 调用互斥锁的TryLock,互斥锁TryLock返回false这儿也直接返回falseif !rw.w.TryLock() {return false}// 加锁成功后// 如果当前还有写者,CompareAndSwap就返回失败if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {// 返回失败就释放互斥锁rw.w.Unlock()// 加锁失败return false}// 加锁成功return true
}
COPY
写者解锁Unlock()
实现代码
func (rw *RWMutex) Unlock() {// 这里是对Lock readerCount的逆向操作// 在Lock的时候对readerCount减去了rwmutexMaxReaders,这次加回来;这样就还原了readerCount,即使在Lock之后依然有读者加锁r := rw.readerCount.Add(rwmutexMaxReaders)if r >= rwmutexMaxReaders {race.Enable()fatal("sync: Unlock of unlocked RWMutex")}// 然后循环看当前有多少读者正在等待信号,就释放多少次心血号for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)}// Allow other writers to proceed.rw.w.Unlock()
}
COPY
测试
package mutex_testimport ("sync""testing""time""github.com/stretchr/testify/assert"
)// 测试读写互斥锁在正常读锁定和解锁情况下的成功执行
func TestRWMutex_ShouldSuccess_WhenNormalReaderLockAndUnLock(t *testing.T) {// 初始化一个读写互斥锁rwmutex := sync.RWMutex{}// 获取读锁rwmutex.RLock()// 设置成功标志为true,使用defer确保在函数结束时释放读锁isSuccess := truedefer rwmutex.RUnlock()// 记录日志表示测试成功t.Log("success")// 断言成功标志为trueassert.True(t, isSuccess)
}// 测试RWMutex的写锁功能是否正常
func TestRWMutex_ShouldSuccess_WhenNormalWriterLockAndUnLock(t *testing.T) {rwmutex := sync.RWMutex{} // 创建一个sync.RWMutex类型的变量rwmutex.Lock() // 获取写锁isSuccess := true // 标记为成功状态defer rwmutex.Unlock() // 确保在函数退出时释放锁,避免死锁t.Log("success") // 记录测试日志assert.True(t, isSuccess) // 断言isSuccess为true,验证操作成功
}// 函数测试了在正常情况下,
// 读写锁(RWMutex)的读锁(RLock)和写锁(Lock)的加锁与解锁操作是否成功。
func TestRWMutex_ShouldSuccess_WhenNormalReaderWriterLockAndUnLock(t *testing.T) {// 初始化一个读写锁rwmutex := sync.RWMutex{}// 尝试获取读锁并立即释放rwmutex.RLock()rwmutex.RUnlock()// 尝试获取写锁并立即释放rwmutex.Lock()rwmutex.Unlock()// 标记测试为成功isSuccess := true// 记录测试成功日志t.Log("success")// 断言测试结果为真assert.True(t, isSuccess)
}// 测试读写锁在多协程情况下的读写互斥
func TestRWMutex_ShouldSuccess_WhenReaderAndWriterInDifferentRoutine(t *testing.T) {// 初始化一个读写锁和等待组,用于协调不同协程的操作。rwmutex := sync.RWMutex{}wg := sync.WaitGroup{}wg.Add(2) // 预期有两个协程完成操作// 启动一个协程作为读锁持有者go func() {rwmutex.RLock() // 获取读锁println("reader") // 打印读操作标识rwmutex.RUnlock() // 释放读锁wg.Done() // 表示读操作完成}()// 启动另一个协程作为写锁持有者go func() {rwmutex.Lock() // 获取写锁println("writer") // 打印写操作标识rwmutex.Unlock() // 释放写锁wg.Done() // 表示写操作完成}()wg.Wait() // 等待所有协程完成操作isSuccess := truet.Log("success") // 记录测试成功assert.True(t, isSuccess) // 断言测试结果为真
}// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldBlockWriter_WhenMultipleReader(t *testing.T) {rwmutex := sync.RWMutex{}ch := make(chan bool)wg := sync.WaitGroup{}wg.Add(2)for i := 0; i < 2; i++ {go func(i int) {wg.Done()rwmutex.RLock()println("reader Locked", i)time.Sleep(10 * time.Second)rwmutex.RUnlock()println("reader UnLocked", i)}(i)}go func() {wg.Wait()println("writer try to accquire wlock")rwmutex.Lock()println("writer has accquired wlock")defer rwmutex.Unlock()ch <- true}()<-chisSuccess := truet.Log("success")assert.True(t, isSuccess)
}// 测试读写锁在多个写锁情况下的读写互斥
func TestRWMutex_ShouldBlockReaders_WhenWriterIsPresent(t *testing.T) {rwmutex := sync.RWMutex{}wg := sync.WaitGroup{}wg.Add(1)go func() {println("writer try to accquire wlock")rwmutex.Lock()println("writer has accquired wlock")wg.Done()time.Sleep(10 * time.Second)defer rwmutex.Unlock()println("writer has released wlock")}()wg.Wait()wg.Add(2)for i := 0; i < 2; i++ {go func(i int) {println("reader try to lock", i)rwmutex.RLock()println("reader Locked", i)rwmutex.RUnlock()println("reader UnLocked", i)wg.Done()}(i)}wg.Wait()isSuccess := truet.Log("success")assert.True(t, isSuccess)
}// 测试读写锁在多个写锁情况下的读写互斥
func TestRWMutex_ShouldBlockConcurrentWriters(t *testing.T) {rwmutex := sync.RWMutex{}var blockedWriter boolch := make(chan bool)wg := sync.WaitGroup{}wg.Add(1)go func() {wg.Done()println("Writer 1 try to accquire wlock")rwmutex.Lock()println("Writer 1 has accquired wlock")defer rwmutex.Unlock()time.Sleep(15 * time.Second)}()go func() {wg.Wait()println("Writer 2 try to accquire wlock")rwmutex.Lock()println("Writer 2 has accquired wlock")ch <- truedefer rwmutex.Unlock()}()select {case <-ch:blockedWriter = falsecase <-time.After(20 * time.Second):blockedWriter = true}assert.True(t, blockedWriter)
}// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldLockSuccess_WhenTryingToReadLockTwice(t *testing.T) {rwmutex := sync.RWMutex{}writerWaitGroup := sync.WaitGroup{}writerWaitGroup.Add(1)go func() {rwmutex.RLock()println("readlock locked once")rwmutex.RLock()println("readlock locked twice")rwmutex.RUnlock()rwmutex.RUnlock()defer writerWaitGroup.Done()}()writerWaitGroup.Wait()isSuccess := trueassert.True(t, isSuccess)
}// 测试读写锁在多个写锁情况下的读写互斥
func TestRWMutex_ShouldBeBlocked_WhenTryingToWriteLockTwice(t *testing.T) {rwmutex := sync.RWMutex{}ch := make(chan bool)go func() {rwmutex.Lock()println("writelock locked once")rwmutex.Lock()println("writelock locked twice")rwmutex.Unlock()rwmutex.Unlock()ch <- true}()isBlocked := falseselect {case <-ch:println("should not execute this block")assert.False(t, isBlocked)case <-time.After(10 * time.Second):isSuccess := trueprintln("executed timeout block")assert.True(t, isSuccess)}}// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldBeBlocked_WhenAccquireWriteLockThenReadLock(t *testing.T) {rwmutex := sync.RWMutex{}ch := make(chan bool)go func() {rwmutex.Lock()println("writelock locked once")rwmutex.RLock()println("readlock locked twice")rwmutex.RUnlock()rwmutex.Unlock()ch <- true}()isBlocked := falseselect {case <-ch:println("should not execute this block")assert.False(t, isBlocked)case <-time.After(10 * time.Second):isSuccess := trueprintln("executed timeout block")assert.True(t, isSuccess)}}// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldBeBlocked_WhenAccquireReadLockThenWriteLock(t *testing.T) {rwmutex := sync.RWMutex{}ch := make(chan bool)go func() {rwmutex.RLock()println("readlock locked once")rwmutex.Lock()println("writelock locked twice")rwmutex.Unlock()rwmutex.RUnlock()ch <- true}()isBlocked := falseselect {case <-ch:println("should not execute this block")assert.False(t, isBlocked)case <-time.After(10 * time.Second):isSuccess := trueprintln("executed timeout block")assert.True(t, isSuccess)}}// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldDeadlockOrBlocked_WhenLockOneGoroutineAccquiredLockAndAnotherGoroutineAccquireLockAgain(t *testing.T) {var rwmutex1, rwmutex2 sync.RWMutexwg := sync.WaitGroup{}wg1 := sync.WaitGroup{}ch := make(chan bool)wg.Add(1)wg1.Add(1)go func() {rwmutex1.Lock()println("rwmutex1 locked")wg.Done()wg1.Wait()println("rwmutex2 try to accquire lock")rwmutex2.Lock()}()go func() {wg.Wait()rwmutex2.Lock()println("rwmutex2 locked")wg1.Done()println("rwmutex1 try to accquire lock")rwmutex1.Lock()ch <- true}()isBlocked := falseselect {case <-ch:println("should not execute this block")assert.False(t, isBlocked)case <-time.After(10 * time.Second):isSuccess := trueprintln("executed timeout block")assert.True(t, isSuccess)}}
参考
逐步学习Go-sync.RWMutex(读写锁)-深入理解与实战 – 小厂程序员