【Go】锁相关

文章目录

  • Mutex锁
    • mutex源码分析
      • Lock
      • UnLock
    • mutex两种运行模式
      • mutex normal 正常模式
        • 自旋
      • mutex starvation 饥饿模式
    • 锁的底层实现类型
  • RWMutex
    • RWMutex 实现
      • 其他共享内存线程安全的方式
  • 思考
    • 如何设计一个并发更高的锁?

Mutex锁

mutex源码分析

Locker接口:

type Locker interface {Lock()Unlock()
}

Mutex 就实现了这个接口,Lock请求锁,Unlock释放锁

type Mutex struct {state int32   //锁状态,保护四部分含义sema  uint32  //信号量,用于阻塞等待或者唤醒
}

在这里插入图片描述

  • Locked:表示该 mutex 是否被锁定,0 表示没有,1 表示处于锁定状态;

  • Woken:表示是否有协程被唤醒,0 表示没有,1 表示有协程处于唤醒状态,并且在加锁过程中;

  • Starving:Go1.9 版本之后引入,表示 mutex 是否处于饥饿状态,0 表示没有,1 表示有协程处于饥饿状态;

  • Waiter: 等待锁的协程数量。

方法解析

const (// mutex is locked ,在低位,值 1mutexLocked = 1 << iota//标识有协程被唤醒,处于 state 中的第二个 bit 位,值 2mutexWoken//标识 mutex 处于饥饿模式,处于 state 中的第三个 bit 位,值 4mutexStarving// 值 3,state 值通过右移三位可以得到 waiter 的数量// 同理,state += 1 << mutexWaiterShift,可以累加 waiter 的数量mutexWaiterShift = iota// 标识协程处于饥饿状态的最长阻塞时间,当前被设置为 1msstarvationThresholdNs = 1e6
)

Lock

func (m *Mutex) Lock() {// Fast path: grab unlocked mutex. //运气好,直接加锁成功if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}// Slow path (outlined so that the fast path can be inlined)//内联,加锁失败,就得去自旋竞争或者饥饿模式下竞争m.lockSlow()
}
func (m *Mutex) lockSlow() {var waitStartTime int64// 标识是否处于饥饿模式starving := false// 唤醒标记awoke := false// 自旋次数iter := 0old := m.statefor {// 非饥饿模式下,开启自旋操作// 从 runtime_canSpin(iter) 的实现中(runtime/proc.sync_runtime_canSpin)可以知道,// 如果 iter 的值大于 4,将返回 falseif old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// 如果没有其他 waiter 被唤醒,那么将当前协程置为唤醒状态,同时 CAS 更新 mutex 的 Woken 位if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}// 开启自旋runtime_doSpin()iter++// 重新检查 state 的值old = m.statecontinue}new := old// 非饥饿状态if old&mutexStarving == 0 {// 当前协程可以直接加锁new |= mutexLocked}// mutex 已经被锁住或者处于饥饿模式// 那么当前协程不能获取到锁,将会进入等待状态if old&(mutexLocked|mutexStarving) != 0 {// waiter 数量加 1,当前协程处于等待状态new += 1 << mutexWaiterShift}// 当前协程处于饥饿状态并且 mutex 依然被锁住,那么设置 mutex 为饥饿模式if starving && old&mutexLocked != 0 {new |= mutexStarving}if awoke {if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}// 清除唤醒标记// &^ 与非操作,mutexWoken: 10 -> 01// 此操作之后,new 的 Locked 位值是 1,如果能够成功写入到 m.state 字段,那么当前协程获取锁成功new &^= mutexWoken}// CAS 设置新状态成功if atomic.CompareAndSwapInt32(&m.state, old, new) {// 旧的锁状态已经被释放并且处于非饥饿状态// 这个时候当前协程正常请求到了锁,就可以直接返回了if old&(mutexLocked|mutexStarving) == 0 {break}// 处理当前协程的饥饿状态// 如果之前已经处于等待状态了(已经在队列里面),那么将其加入到队列头部,从而可以被高优唤醒queueLifo := waitStartTime != 0if waitStartTime == 0 {// 阻塞开始时间waitStartTime = runtime_nanotime()}// P 操作,阻塞等待runtime_SemacquireMutex(&m.sema, queueLifo, 1)// 唤醒之后,如果当前协程等待超过 1ms,那么标识当前协程处于饥饿状态starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.state// mutex 已经处于饥饿模式if old&mutexStarving != 0 {// 1. 如果当前协程被唤醒但是 mutex 还是处于锁住状态// 那么 mutex 处于非法状态//// 2. 或者如果此时 waiter 数量是 0,并且 mutex 未被锁住// 代表当前协程没有在 waiters 中,但是却想要获取到锁,那么 mutex 状态非法if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}// delta 代表加锁并且将 waiter 数量减 1 两步操作delta := int32(mutexLocked - 1<<mutexWaiterShift)// 非饥饿状态 或者 当前只剩下一个 waiter 了(就是当前协程本身)if !starving || old>>mutexWaiterShift == 1 {// 那么 mutex 退出饥饿模式delta -= mutexStarving}// 设置新的状态atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0} else {old = m.state}}if race.Enabled {race.Acquire(unsafe.Pointer(m))}
}

解锁操作会根据 Mutex.state 的状态来判断需不需要去唤醒其他等待中的协程。

func (m *Mutex) unlockSlow(new int32) {// new - state 字段原子减 1 之后的值,如果之前是处于加锁状态,那么此时 new 的末位应该是 0// 此时 new+mutexLocked 正常情况下会将 new 末位变成 1// 那么如果和 mutexLocked 做与运算之后的结果是 0,代表 new 值非法,解锁了一个未加锁的 mutexif (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")}// 如果不是处于饥饿状态if new&mutexStarving == 0 {old := newfor {// old>>mutexWaiterShift == 0 代表没有等待加锁的协程了,自然不需要执行唤醒操作// old&mutexLocked != 0 代表已经有协程加锁成功,此时没有必要再唤醒一个协程(因为它不可能加锁成功)// old&mutexWoken != 0 代表已经有协程被唤醒并且在加锁过程中,此时不需要再执行唤醒操作了// old&mutexStarving != 0 代表已经进入了饥饿状态,// 以上四种情况,皆不需要执行唤醒操作if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}// 唤醒一个等待中的协程,将 state woken 位置为 1// old - 1<<mutexWaiterShift waiter 数量减 1new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)return}old = m.state}} else {// 饥饿模式// 将 mutex 的拥有权转移给下一个 waiter,并且交出 CPU 时间片,从而能够让下一个 waiter 立刻开始执行runtime_Semrelease(&m.sema, true, 1)}
}

UnLock

// 解锁操作
func (m *Mutex) Unlock() {if race.Enabled {_ = m.staterace.Release(unsafe.Pointer(m))}// mutexLocked 位设置为 0,解锁new := atomic.AddInt32(&m.state, -mutexLocked)// 如果此时 state 值不是 0,代表其他位不是 0(或者出现异常使用导致 mutexLocked 位也不是 0)// 此时需要进一步做一些其他操作,比如唤醒等待中的协程等if new != 0 {m.unlockSlow(new)}
}

mutex两种运行模式

饥饿模式是对公平性和性能的一种平衡,它避免了某些 goroutine 长时间的等待锁。在饥饿模式下,优先对待的是那些一直在等待的 waiter。

mutex normal 正常模式

默认情况下,Mutex的模式为normal。

该模式下,协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁。

正常模式 高吞吐量
在这里插入图片描述

自旋

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。
在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:

  • 互斥锁只有在普通模式才能进入自旋;
  • runtime.sync_runtime_canSpin 需要返回 true:
    运行在多 CPU 的机器上
  • 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
  • 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
    https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/

mutex starvation 饥饿模式

自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为"饥饿"模式,然后再阻塞。

处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,同时也会把等待计数减1。

在饥饿模式下,Mutex 的拥有者将直接把锁交给队列最前面的 waiter。新来的 goroutine 不会尝试获取锁,即使看起来锁没有被持有,它也不会去抢,也不会 spin(自旋),它会乖乖地加入到等待队列的尾部。

如果拥有 Mutex 的 waiter 发现下面两种情况的其中之一,它就会把这个 Mutex 转换成正常模式:

  • 此 waiter 已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了;
  • 此 waiter 的等待时间小于 1 毫秒(ms)。

锁的底层实现类型

锁内存总线,针对内存的读写操作,在总线上控制,限制程序的内存访问

锁缓存行,同一个缓存行的内容读写操作,CPU内部的高速缓存保证一致性

锁,作用在一个对象或者变量上。现代CPU会优先在高速缓存查找,如果存在这个对象、变量的缓存行数据,会使用锁缓存行的方式。否则,才使用锁总线的方式。

RWMutex

RWMutex 实现

type RWMutex struct {w           Mutex  // 复用互斥锁能力//写锁信号量   当阻塞写操作的读操作goroutine释放读锁时,通过该信号量通知阻塞的写操作的goroutine;writerSem   uint32 
// 读锁信号量 当写操作goroutine释放写锁时,通过该信号量通知阻塞的读操作的goroutinereaderSem   uint32 // 当前读操作的数量,包含所有已经获取到读锁或者被写操作阻塞的等待获取读锁的读操作数量readerCount int32  // 获取写锁需要等待读锁释放的数量readerWait  int32 
}

通过记录 readerCount 读锁的数量来进行控制,当有一个写锁的时候,会将读 锁数量设置为负数 1<<30。目的是让新进入的读锁等待之前的写锁释放通知读 锁。同样的当有写锁进行抢占时,也会等待之前的读锁都释放完毕,才会开始 21 进行后续的操作。 而等写锁释放完之后,会将值重新加上 1<<30, 并通知刚才 新进入的读锁(rw.readerSem),两者互相限制。

const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {// First, resolve competition with other writers.// 写锁也就是互斥锁,复用互斥锁的能力来解决与其他写锁的竞争// 如果写锁已经被获取了,其他goroutine在获取写锁时会进入自旋或者休眠rw.w.Lock()// 将readerCount设置为负值,告诉读锁现在有一个正在等待运行的写锁(获取互斥锁成功)r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders// 获取互斥锁成功并不代表goroutine获取写锁成功,我们默认最大有2^30的读操作数目,减去这个最大数目// 后仍然不为0则表示前面还有读锁,需要等待读锁释放并更新写操作被阻塞时等待的读操作goroutine个数;if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)}
}
func (rw *RWMutex) Unlock() {// Announce to readers there is no active writer.// 将readerCount的恢复为正数,也就是解除对读锁的互斥r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)if r >= rwmutexMaxReaders {race.Enable()throw("sync: Unlock of unlocked RWMutex")}// 如果后面还有读操作的goroutine则需要唤醒他们for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)}// 释放互斥锁,写操作的goroutine和读操作的goroutine同时竞争rw.w.Unlock()
}

读锁


func (rw *RWMutex) RLock() {// 原子操作readerCount 只要值不是负数就表示获取读锁成功if atomic.AddInt32(&rw.readerCount, 1) < 0 {// 有一个正在等待的写锁,为了避免饥饿后面进来的读锁进行阻塞等待runtime_SemacquireMutex(&rw.readerSem, false, 0)}
}
func (rw *RWMutex) RUnlock() {// 将readerCount的值减1,如果值等于等于0直接退出即可;否则进入rUnlockSlow处理if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {// Outlined slow-path to allow the fast-path to be inlinedrw.rUnlockSlow(r)}
}

其他共享内存线程安全的方式

官方不太推荐使用锁,更多的是通过channel做数据交换

思考

如何设计一个并发更高的锁?

在Go语言中,使用切片来设计并发更高效的锁是一种常见的做法,通常被称为"分段锁"或"分片锁"。

这种技术可以在一定程度上减小锁的粒度,从而提高并发性能。

package mainimport ("fmt""sync""hash/fnv"
)const numSegments = 16type ConcurrentMap struct {segments []sync.Mutexdata     map[interface{}]interface{}
}func NewConcurrentMap() *ConcurrentMap {segments := make([]sync.Mutex, numSegments)data := make(map[interface{}]interface{})return &ConcurrentMap{segments: segments, data: data}
}func (cm *ConcurrentMap) getSegment(key interface{}) *sync.Mutex {hash := hashFunction(key) % numSegmentsreturn &cm.segments[hash]
}func (cm *ConcurrentMap) Get(key interface{}) interface{} {segment := cm.getSegment(key)segment.Lock()defer segment.Unlock()return cm.data[key]
}func (cm *ConcurrentMap) Set(key, value interface{}) {segment := cm.getSegment(key)segment.Lock()defer segment.Unlock()cm.data[key] = value
}// 假设的哈希函数,仅用于示例目的
func hashFunction(key interface{}) int {h := fnv.New32a()// 将键的字节表示写入哈希函数_, _ = h.Write([]byte(fmt.Sprintf("%v", key)))return int(h.Sum32())
}func main() {concurrentMap := NewConcurrentMap()var wg sync.WaitGroupnumItems := 1000for i := 0; i < numItems; i++ {wg.Add(1)go func(index int) {defer wg.Done()key := fmt.Sprintf("key%d", index)concurrentMap.Set(key, index)}(i)}wg.Wait()// 输出结果for i := 0; i < numItems; i++ {key := fmt.Sprintf("key%d", i)fmt.Printf("%s: %v\n", key, concurrentMap.Get(key))}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/45161.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python知识点】锦集

【版权声明】未经博主同意&#xff0c;谢绝转载&#xff01;&#xff08;请尊重原创&#xff0c;博主保留追究权&#xff09; https://blog.csdn.net/m0_69908381/article/details/132368704 出自【进步*于辰的博客】 相关博文&#xff1a;【python细节、经验】锦集。 注&#…

MyBatis入门配置及CURD实现

目录 一、MyBatis简介 1. 什么是 MyBatis ? 2. MyBatis的特性 3. 什么是持久层框架&#xff1f; 二、MyBatis环境配置 2.1 创建maven工程 2.2 导入相关pom依赖 2.3 导入jdbc配置文件 2.4 Mybatis相关插件安装 3.5 Mybatis-cfg.xml 核心配置 2.6 引入Log4j2日志文件…

Vue2-配置脚手架、分析脚手架、render函数、ref属性、props配置项、mixin配置项、scoped样式、插件

&#x1f954;:总有一段付出了没有回报的日子 是在扎根 更多Vue知识请点击——Vue.js VUE2-Day6 配置脚手架脚手架结构render函数vue.js与vue.runtime.xxx.js的区别引入render函数为什么要引入残缺的vue呢&#xff1f; 脚手架默认配置ref属性props配置项传递数据接收数据注意点…

NLP序列标注问题,样本不均衡怎么解决?

【学而不思则罔&#xff0c;思而不学则殆】 1.问题 NLP序列标注问题&#xff0c;样本不均衡怎么解决&#xff1f; 2.解释 以命名实体识别&#xff08;NER&#xff09;为例&#xff0c;这个样本不均衡有两种解释&#xff1a; &#xff08;1&#xff09;实体间类别数量不均衡…

华为网络篇 RIP的负载均衡-29

难度2复杂度2 目录 一、实验原理 二、实验拓扑 三、实验步骤 四、实验过程 总结 一、实验原理 RIP是使用跳数&#xff08;经过路由的数量&#xff09;作为metric值的&#xff0c;当网络上存在去往目标的路由有两条以上都是相同metric时&#xff0c;就出现了流量负载均衡。…

未来网络的选择:100G光模块与400G光模块的对比

随着互联网的快速发展和数据传输需求的不断增长&#xff0c;光通信技术在网络领域中扮演着至关重要的角色。光模块是光通信系统中的核心组件之一&#xff0c;而100G光模块和400G光模块是目前应用广泛的两种主要类型。本文将对这两种光模块进行详细的区别对比。 一、传输速率 …

【周末闲谈】关于“数据库”你又知道多少?

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️周末闲谈】 系列目录 ✨第一周 二进制VS三进制 ✨第二周 文心一言&#xff0c;模仿还是超越&#xff1f; ✨第二周 畅想AR 文章目录 系列目录前言数据库数据库的五大特点数据库介绍数据库管理系统&a…

34.Netty源码之Netty如何处理网络请求

highlight: arduino-light 通过前面两节源码课程的学习&#xff0c;我们知道 Netty 在服务端启动时会为创建 NioServerSocketChannel&#xff0c;当客户端新连接接入时又会创建 NioSocketChannel&#xff0c;不管是服务端还是客户端 Channel&#xff0c;在创建时都会初始化自己…

Python web实战之细说 Django 的单元测试

关键词&#xff1a; Python Web 开发、Django、单元测试、测试驱动开发、TDD、测试框架、持续集成、自动化测试 大家好&#xff0c;今天&#xff0c;我将带领大家进入 Python Web 开发的新世界&#xff0c;深入探讨 Django 的单元测试。通过本文的实战案例和详细讲解&#xff…

SystemVerilog之接口详解

1.入门实例 测试平台连接到 arbiter的例子&#xff1a;包括测试平台, arbiter仲裁器, 时钟发生器 和连接的信号。 ㅤㅤㅤ ㅤ ㅤㅤㅤㅤㅤ Arbiter里面可以自定义发送的权重&#xff0c; 是轮询还是自定义 grant表示仲裁出来的是哪一个&#xff0c;也即只有0&#xff0c;1&am…

C#程序配置读写例子 - 开源研究系列文章

今天讲讲关于C#的配置文件读写的例子。 对于应用程序的配置文件&#xff0c;以前都是用的ini文件进行读写的&#xff0c;这个与现在的json类似&#xff0c;都是键值对应的&#xff0c;这次介绍的是基于XML的序列化和反序列化的读写例子。对于ini文件&#xff0c;操作系统已经提…

python采集京东商品详情页面数据,京东API接口,京东h5st签名(2023.08.20)

一、原理与分析 1、目标页面 https://item.jd.com/6515029.html 在chrome中打开&#xff0c;按f12键进入开发者模式&#xff0c;找到商品详情数据接口&#xff0c;如下&#xff1a; 2、URL链接&#xff1a; https://api.m.jd.com/?appidpc-item-soa&functionIdpc_detail…

Axios跨域请求处理

问题背景&#xff1a; vue 项目用 axios 进行请求的时候&#xff0c;总是报“Access to XMLHttpRequest at ‘http://localhost:8889/api/login’ from origin ‘http://localhost:8080……’”的错误 实际上就是前后端分离的情况下&#xff0c;发生了跨域的问题 跨域定义&…

【Linux取经路】解析环境变量,提升系统控制力

文章目录 一、进程优先级1.1 什么是优先级&#xff1f;1.2 为什么会有优先级&#xff1f;1.3 小结 二、Linux系统中的优先级2.1 查看进程优先级2.2 PRI and NI2.3 修改进程优先级2.4 进程优先级的实现原理2.5 一些名词解释 三、环境变量3.1 基本概念3.2 PATH&#xff1a;Linux系…

APSIM模型参数优化 批量模拟丨气象数据准备、物候发育和光合生产、物质分配与产量模拟、土壤水分平衡算法、土壤碳氮平衡模块、农田管理模块等

随着数字农业和智慧农业的发展&#xff0c;基于过程的农业生产系统模型在模拟作物对气候变化的响应与适应、农田管理优化、作物品种和株型筛选、农田固碳和温室气体排放等领域扮演着越来越重要的作用。APSIM (Agricultural Production Systems sIMulator)模型是世界知名的作物生…

JDK中的Timer总结

目录 一、背景介绍二、思路&方案三、过程1.Timer关键类图2.Timer的基本用法3.结合面向对象的角度进行分析总结 四、总结五、升华 一、背景介绍 最近业务中使用了jdk中的Timer&#xff0c;通过对Timer源码的研究&#xff0c;结合对面向对象的认识&#xff0c;对Timer进行针…

pytorch 42 C#使用onnxruntime部署内置nms的yolov8模型

在进行目标检测部署时,通常需要自行编码实现对模型预测结果的解码及与预测结果的nms操作。所幸现在的各种部署框架对算子的支持更为灵活,可以在模型内实现预测结果的解码,但仍然需要自行编码实现对预测结果的nms操作。其实在onnx opset===11版本以后,其已支持将nms操作嵌入…

小程序体验版不存在 无法体验

1、权限问题&#xff1a; 1、开发者有所有权限。 2、小程序访问路径也是正确的。 该有的权限都有了。 2、解决办法&#xff1a; 打开微信公众平台&#xff0c;左侧菜单【设置】- 【第三方设置】&#xff0c;取消授权即可。

数据结构 - 语句的频度和时间复杂度

一、语句频度&#xff1a; 算法的运行时间 Σ每条语句的执行次数X该语句执行一次所需的时间每条语句的执行次数&#xff0c;也称为&#xff1a;语句的频度结合上面两点&#xff0c;可知&#xff1a;算法的运行时间 Σ每条语句的频度X该语句执行一次所需的时间 二、语句执行…

Linux内核源码分析-内存管理

Linux内核内存布局 64位Linux系统一般使用48位表示虚拟地址空间&#xff0c;45位表示物理地址。通过命令&#xff1a;cat /proc/cpuinfo。查看Linux内核位数和proc文件系统输出系统软硬件信息如下&#xff1a; x86_64架构体系内核分布情况 通过 cat /proc/meminfo 输出系统架…