引言
在现代软件开发中,有效地利用并发是提高应用性能和响应速度的关键。随着多核处理器的普及,编程语言和框架如何高效、简便地支持并发编程,成为了软件工程师们评估和选择工具时的一个重要考量。在这方面,Go 语言凭借其创新的并发模型—GPM(Goroutine, P, M)—在众多编程语言中脱颖而出,为开发者提供了强大的工具,以简单、高效的方式实现并发。
自从 2009 年首次发布以来,Go 语言就以其出色的性能、简洁的语法和对并发的原生支持赢得了广泛的关注。尤其是其并发模型,被设计为能够充分利用现代多核处理器的能力,同时隐藏底层的线程管理和同步复杂性,让开发者能够以更直观、更高级的抽象来构建并发程序。GPM 模型,作为 Go 语言并发编程的核心,通过 Goroutine、P(processor)、M(machine)三者的协同工作,实现了一种高效且易于管理的并发机制。
本文将基于 Go1.21 深入浅出地探讨 Go 语言的 GPM 模型,主要分为几个部分:
- 首先从其设计理念出发,详细解析 Goroutine、P 和 M 三者的角色、工作原理及其相互之间的交互方式。
- 然后引入几个关键问题,我们会从结论上先总结 GPM 的核心要点,内容包括协程调度循环、调度策略和调度时机。
- 接着我们会深入源码,去一步步洞察 Go 语言设计者是如何实现 GPM 模型中的各个要点的,这个过程会比较繁琐,但其实也比较有趣,感兴趣的读者可以阅读这一块,若只是想对 GPM 模型有个大概了解,那么停留在上一步也足矣了。
- 最后我们基于前面的分析,总结 G、P、M 三大组件在 Go 程序运行过程中的状态流转图。
通过对 GPM 模型的探讨,我们不仅能够理解 Go 语言如何在众多现代编程语言中以其并发编程能力脱颖而出,还能够洞察其设计背后的智慧,以及这一模型如何随着 Go 语言版本的迭代而不断进化和优化。无论你是对 Go 语言充满好奇的新手,还是希望深化理解其并发模型的经验开发者,本文都将为你提供宝贵的视角和深刻的洞见。
结论先行
GPM 调度原理图
Goroutine 底层结构
调度器 P 底层结构
GPM 调度循环
GPM 协程调度优先级与顺序
寻找可执行 G 过程
协程切换时机
GPM 模型
1. 概览
这里有一张很流行的 Goroutine 调度原理图:
代号 | 名称 | 定义位置 | 作用 |
---|---|---|---|
Sched | 调度器 | proc.c | 维护有存储 M 和 G 的队列以及调度器的一些状态信息等。 |
M | Machine 系统线程 | runtime.h | 它由操作系统管理的,Goroutine 就是跑在 M 之上的;M 是一个很大的结构,里面维护小对象内存 cache(mcache)、当前执行的 Goroutine、随机数发生器等等非常多的信息。 |
P | Processor 处理器 | runtime.h | 它的主要用途就是用来执行 Goroutine 的,它维护了一个 Goroutine 队列,即 runqueue。Processor 是让我们从 N:1 调度到 M:N 调度的重要部分。所有的 P 都在程序启动时创建,并保存在数组中,最多有 GOMAXPROCS(可配置)个。 |
G | Goroutine 实现的核心结构 | runtime.h | 它包含了栈,指令指针,以及其他对调度 Goroutine 很重要的信息,例如其阻塞的 channel。 |
Global Queue | 全局队列 | proc.h | 存放等待运行的 G。全局队列可能被任意的 P 加锁去获取里面的 G。 |
P Local Queue | P 的本地队列 | proc.h | 同全局队列类似,存放的也是等待运行的 G,但存放的数据有限,不会超过 256 个。新建 G 时,G 会优先加入本地队列。如果队列满了,则会把本地队列中一半的 G 以及新 G 一起移动到全局队列。 |
通过这个原理图我们知道 Go 语言的 GPM 模型的作用非常简单,它就是一个“精打细算”的工具。以前单进程无法充分利用 CPU 资源,所以引入了多进程。又因为进程拥有的资源太多,其创建、切换和销毁都会占用很长时间,所以引入了更小粒度的线程。随着计算机科学的进步,现在看来,线程拥有的资源也是“比较多”的,所以线程的创建、切换和销毁代价也是“相对大”的。所以很多编程语言就引入了协程这个概念,其核心目的就是应用层自己抽象一个比线程更小粒度的调度单元,应用层结合操作系统的多线程能力,自己来管理“调度单元”的创建、切换和销毁,从而尽可能减少由线程切换带来的开销,以做到更轻量级的并发。
不同的编程语言可能有不同的实现,而关键就在于如何让调度更快、开销更小。这便是我们本文要探讨的主要内容。
{% note info %}
Go 语言的实现:
线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 会尝试从全局队列获得一批 G 放到 P 的本地队列,或者从其他 P 的本地队列中“偷”一半 G 放到自己的本地队列。然后 M 运行 G,G 执行之后,再从 P 获取下一个 G,如此不断重复下去。
{% endnote %}
在进入更加具体深入的讨论之前,我们需要重点思考以下几个问题:
- G 我们可以随便创建,可能有成千上万个,那 P 和 M 有多少个呢?
- P 和 M 什么时候被创建呢?
- 操作系统只知道线程,所以实际上还是线程在执行任务,那么 G 是如何调度到线程上并执行的呢?
- 如何防止协程饥饿?
- 如何减少频繁地创建和销毁线程?
- 多个线程从全局队列拿 G 如何解决并发问题?又如何减少这种数据竞争呢?
- 在整个 Go 调度协程的过程中,G、P、M 有哪些状态?它们又是如何轮转的呢?
如果你对这几个问题有兴趣,请继续阅读下文。
2. P 和 M 的个数问题
- P 的数量由启动时环境变量
$GOMAXPROCS
或者程序中runtime.GOMAXPROCS()
决定。这意味着在程序执行的任意时刻都只有GOMAXPROCS
个 Goroutine 在同时运行。 - M 的数量由 Go 语言本身的限制决定,Go 程序启动时会设置 M 的最大数量为 10000 个,但是内核很难支持这么多的线程数,所以这个限制可以忽略。可以使用
runtime.SetMaxThreads()
设置 M 的最大数量。
3. P 和 M 何时被创建
- P 的创建时机在确定了 P 的最大数量 n 后,runtime 会根据这个数量创建 n 个 P。
- M 创建的时机是在当没有足够的 M 来关联 P 并运行其中可运行的 G 的时候,如所有的 M 此时都阻塞住了,而 P 中还有很多就绪任务,就会去寻找空闲的 M,如果没有空闲的 M,就会去创建新的 M。
4. 调度循环
在讨论 G 是如何被调度到 M 去执行的时候,我们需要先介绍 GPM 模型中两个比较特殊的角色:m0
和 g0
。
4.1 m0
- 定义:m0 是 Go 程序启动时创建的第一个 M。它是由 Go 运行时系统直接从操作系统线程创建的,不是从线程池中获取的。
- 作用:m0 负责初始化和启动 Go 运行时环境,包括创建调度器、分配第一个 P(p0),并创建其他系统级别的资源。在程序的整个生命周期中,m0 会继续存在,即使它可能不执行任何 Go 代码。
- 特点:m0 不同于其他 M,因为它不是从线程池中获取的。它可能没有绑定任何 P,除非程序中只有一个 P(即 GOMAXPROCS 设置为 1)。
4.2 g0
- 定义:g0 是每个 M 的特殊 Goroutine,它不执行任何实际的 Go 代码。每个 M 在创建时都会分配一个 g0。
- 作用:g0 主要用于执行调度器代码和进行系统调用。当 M 需要执行这些非用户代码时,会切换到 g0 的栈上运行。
- 特点:g0 拥有自己的栈,这个栈用于存放调度器函数和系统调用的数据。这意味着当执行这些操作时,不会影响当前运行的用户 Goroutine 的栈。
4.3 协程栈切换
g0 是 M 中负责调度其他 g 的协程,所以一个 M 中的协程调度其实就是在 g 和 g0 之间不断切换:
大致过程如下:
- 当 M 执行一个 G(用户 Goroutine)时,它使用 G 的栈来运行用户代码。
- 当需要执行系统调用或调度器相关的代码时,M 会切换到 g0。g0 拥有自己的栈,专门用于执行系统调用和调度器代码,这样可以避免污染用户 Goroutine 的栈空间。在 g0 上,M 可以执行如内存分配、调度决策、处理 Goroutine 的创建和销毁等操作。
- 完成系统调用或调度器任务后,M 会切换回之前的 G,继续执行用户代码。这个过程会从 g0 的栈切换回 G 的栈。
详细细节我们留到后面的源码分析揭晓。
5. 调度策略
5.1 获取本地运行队列
P 会优先尝试从自己的本地队列中寻找就绪的 G,它一般会优先调度最近加入的。
因为这个时候可能由其他 P 来窃取 G,所以这里是需要同步机制的,Go 采用原子操作来降低同步开销。
本地队列 G 的个数不超过 256 个,如果在创建 G 的时候本地队列满了,会将本地队列中 1/2 的 G 连同新创建的 G 一起放入全局队列中。
5.2 获取全局运行队列
P 会优先本地队列,然后才全局队列,这有个好处:
- 如果只有全局队列,那么所有的 P 都需要去竞争全局队列中的 G,这个时候需要上锁,且数据竞争会比较激烈,性能较差。
- 通过每个 P 维护一个自己的本地队列可以减少并发冲突,如果实在需要去全局队列拿 G,也可以一次性拿多个,大大减少了并发冲突的情况发生。
但这又带来了一个问题,全局队列中协程的饥饿问题,因为 P 会优先调度最近加入到自己本地队列中的 G,那可能会一直有新的 G 被创建,导致全局队列中的 G 没有机会被调度到。Go 的解决思路是:
- P 每调度 61 次后,就会从全局队列中获取一个 G 来运行。
5.3 获取准备就绪的网络协程
如果本地队列和全局队列都找不到就绪的 G 可以执行的话。调度会通过 runtime.netpoll
获取可以运行的网络协程。
Go 语言的网络模型是对不同操作系统平台上 I/O 多路复用技术的封装。
当 Goroutine 在进行网络 I/O 时,它会被挂起,线程会去执行其他 Goroutine。一旦 I/O 操作完成,该 Goroutine 会被唤醒并重新排队等待执行。
5.4 系统调用
当一个 Goroutine 执行系统调用时,它可能会被阻塞,这时它的执行线程(M)可能会释放当前绑定的处理器(P),以便其他 Goroutine 可以在该 P 上运行。
5.5 协程窃取
空闲的 M 如果绑定了 P,那么它的 P 会一直尝试从其他 P 的队列中窃取 Goroutine,以平衡负载和避免空闲。这个时候为了让每个 P 都有可能被窃取,Go 没有直接顺序遍历 P 列表,而是采用了一种相对随机的方式去遍历 P 列表,直到找到可以运行的协程就返回。M 不断寻找可执行 G 的这段期间,它被称为自旋线程。
所以为减少创建、切换和销毁线程的开销,Go 做了至少两点努力:
-
偷取(Work Stealing)机制
当本线程无可运行的 G 时,它所绑定的 P 会尝试从其他线程绑定的 P 窃取 G,而不是销毁线程。
-
移交(Hand Off)机制
当本线程因为 G 进行系统调用而阻塞时,线程会释放绑定的 P,把 P 移交给其他空闲的线程执行。
6. 调度时机
Go 语言的调度器结合了抢占式调度和协作式调度,以下是 Go 中这两种调度方式的具体实现和特性:
6.1 协作式调度(Cooperative Scheduling)
阻塞操作:
- 当 Goroutine 执行阻塞操作(如通道操作、等待锁、系统调用等)时,它会主动放弃 CPU 控制权,允许调度器切换到其他 Goroutine。
显式调度:
- Goroutine 显式请求
runtime.Gosched()
调用,调度器进行调度。 - 这个时候回从当前协程切换到 g0 协程,取消 G 与 M 之间的绑定关系,把 G 放入全局队列中。
6.2 抢占式调度(Preemptive Scheduling)
基于时间的抢占:
- 从 Go 1.14 开始,调度器引入了基于时间的抢占机制。如果一个 Goroutine 运行时间超过 10 毫秒,或者在系统调用中超过了 20 微妙,调度器会在安全点(如函数调用、循环迭代、阻塞操作等)尝试暂停该 Goroutine。
- 这种抢占不依赖于 Goroutine 的显式放弃控制,而是由调度器主动触发。
- 安全点的选择旨在减少对 Goroutine 执行的干扰,同时确保调度的公平性和响应性。
基于信号的抢占:
- 当程序在执行过程中既无法主动挂起,也不能进行系统调用,且无法进行函数调用时,就可以使用信号来调度。
- 信号其实就是线程信号,在操作系统中有很多基于信号的底层通信方式(SIGPIPE / SIGURG / SIGHUP),而我们的线程可以注册对应信号的处理函数。
- 当线程接收到抢占信号时,会进入一个专门的信号处理器。这个处理器会检查是否处于安全点,如果是,则暂停当前 Goroutine 并进行上下文切换。
源码分析
前面我们对 Go 语言的 GPM 模型在基本概念、调度循环、调度策略和调度时机各个方面都进行了详细的阐述。如果读者只是想简单了解一下 GPM 模型的一些概念和设计思想,那么阅读到这里就基本足够了。如果对其源码实现有兴趣的话,那么请继续往下阅读~
接下来我们会从以下几个方面来对 Go 语言的 GPM 模型进行源码分析:
- G、P、M 在 Go 语言中的表示。
- G 的创建过程。
- g 和 g0 的切换过程。
- GPM 的调度机制。
1. G 的底层结构
G 在 Go 里面就是 runtime2.go 里面定义的 g
结构体:
type g struct {// 栈参数。// stack 描述实际的栈内存:[stack.lo, stack.hi)。// stackguard0 是在 Go 栈增长序言中比较的栈指针。// 通常是 stack.lo+StackGuard,但可以是 StackPreempt 来触发抢占。// stackguard1 是在 C 栈增长序言中比较的栈指针。// 在 g0 和 gsignal 栈上是 stack.lo+StackGuard。// 在其他 goroutine 栈上是 ~0,以触发对 morestackc 的调用(并崩溃)。stack stack // 运行时/CGO 已知的偏移stackguard0 uintptr // liblink 已知的偏移stackguard1 uintptr // liblink 已知的偏移_panic *_panic // 最内层的 panic - liblink 已知的偏移_defer *_defer // 最内层的 deferm *m // 当前 m;arm liblink 已知的偏移sched gobuf // 当前协程的运行现场syscallsp uintptr // 如果 status==Gsyscall, syscallsp = sched.sp 在 gc 期间使用syscallpc uintptr // 如果 status==Gsyscall, syscallpc = sched.pc 在 gc 期间使用stktopsp uintptr // 栈顶的预期 sp,用于回溯检查// param 是一个通用的指针参数字段,用于在特定上下文中传递值,// 其他存储参数的方式难以找到。目前有三种用途:// 1. 当通道操作唤醒一个阻塞的 goroutine 时,它将 param 设置为// 指向已完成阻塞操作的 sudog。// 2. 由 gcAssistAlloc1 使用,以向其调用者信号,表明 goroutine 完成了 GC 周期。// 以任何其他方式这样做是不安全的,因为此时 goroutine 的栈可能已经移动。// 3. 由 debugCallWrap 使用,以将参数传递给新的 goroutine,因为在运行时分配闭包是被禁止的。param unsafe.Pointeratomicstatus atomic.Uint32stackLock uint32 // sigprof/scang 锁;TODO: 合并到 atomicstatusgoid uint64schedlink guintptrwaitsince int64 // g 变为阻塞的大致时间waitreason waitReason // 如果 status==Gwaitingpreempt bool // 抢占信号,复制 stackguard0 = stackpreemptpreemptStop bool // 在抢占时转换为 _Gpreempted;否则,只是取消调度preemptShrink bool // 在同步安全点缩小栈// asyncSafePoint 设置为 true 表示 g 在异步安全点停止。// 这意味着栈上有没有精确指针信息的帧。asyncSafePoint boolpaniconfault bool // 在意外的故障地址上触发 panic(而不是崩溃)gcscandone bool // g 已扫描栈;由 _Gscan 位在状态中保护throwsplit bool // 必须不分割栈// activeStackChans 表示有未锁定的通道指向这个 goroutine 的栈。// 如果为 true,栈复制需要获取通道锁来保护这些栈区域。activeStackChans bool// parkingOnChan 表示 goroutine 即将在 chansend 或 chanrecv 上停车。// 用于标记栈缩小的不安全点。parkingOnChan atomic.Boolraceignore int8 // 忽略竞态检测事件tracking bool // 是否跟踪此 G 以获取调度延迟统计trackingSeq uint8 // 用于决定是否跟踪此 GtrackingStamp int64 // G 最后开始被跟踪的时间戳runnableTime int64 // 可运行时间,运行时清除,仅在跟踪时使用lockedm muintptrsig uint32writebuf []bytesigcode0 uintptrsigcode1 uintptrsigpc uintptrparentGoid uint64 // 创建此 goroutine 的 goroutine 的 goidgopc uintptr // 创建此 goroutine 的 go 语句的 pcancestors *[]ancestorInfo // 创建此 goroutine 的祖先 goroutine 的信息(仅在 debug.tracebackancestors 使用)startpc uintptr // goroutine 函数的 pcracectx uintptrwaiting *sudog // 此 g 正在等待的 sudog 结构(具有有效的 elem 指针);按锁顺序cgoCtxt []uintptr // cgo 回溯上下文labels unsafe.Pointer // 分析器标签timer *timer // 缓存的计时器,用于 time.SleepselectDone atomic.Uint32 // 我们是否参与 select 并且有人赢得了竞赛?// goroutineProfiled 指示当前 goroutine 的栈状态// 是否已经被记录在进行中的 goroutine 性能分析中。goroutineProfiled goroutineProfileStateHolder// 每个 G 的追踪状态。trace gTraceState// 每个 G 的 GC 状态// gcAssistBytes 是此 G 的 GC 协助信用,以分配的字节为单位。// 如果为正,则 G 有信用分配 gcAssistBytes 字节而不协助。// 如果为负,则 G 必须通过执行扫描工作来纠正这一点。// 我们以字节为单位跟踪这一点,以便在 malloc 热路径中快速更新和检查债务。// 协助比率决定了这如何对应于扫描工作债务。gcAssistBytes int64
}
可以看到 g
结构字段非常多,这个结构体的设计反映了 Go 语言对并发和协程管理的底层机制,包括栈管理、调度、垃圾回收、异常处理等多个方面。通过这种抽象,Go 语言能够有效地管理成千上万的 goroutine
,使得并发编程变得更加简单和高效。
这里我们只关注 GPM 模型相关的内容,需要重点关心以下几个字段:
type g struct {stack stack // 当前协程的协程栈m *m // 当前线程sched gobuf // 保存协程的运行现场atomicstatus atomic.Uint32 // 协程状态goid uint64 // 协程ID
}
1.1 协程栈 stack
其中 stack
结构如下,它存储了协程栈的低地址和高地址。
type stack struct {lo uintptr // 栈的低地址hi uintptr // 栈的高地址
}
1.2 线程抽象 m
而 m
就是 Go 语言对操作系统线程的抽象,这不是实际的线程,这只是 Go 语言对线程相关信息的抽象,以方便更好地调度协程。
type m struct {g0 *g // g0 协程,Go 中的主协程curg *g // 现在正在运行的协程id int64 // 线程IDmOS // 当前操作系统对线程的额外描述信息...
}
m
结构体包含了许多字段,这些字段涉及到线程管理、调度、信号处理、系统调用、锁管理等多个方面。这个结构体是 Go 并发模型的核心部分之一,它与 g
(goroutine)和 p
(processor)结构体一起,构成了 Go 的调度系统的基础。通过这种设计,Go 能够有效地在多个操作系统线程之间调度成千上万的 goroutine,实现高效的并发执行。
1.3 协程上下文 gobuf
gobuf
结构体在 Go 语言的运行时系统中用于保存 Goroutine
的执行上下文,特别是在调度和系统调用中。这个结构体保存了足够的信息以便在 Goroutine
被暂停后能够恢复执行。
下面是对 gobuf
结构体中各个字段的解释:
type gobuf struct {// sp, pc 和 g 的偏移量是已知的(在 libmach 中硬编码)。//// ctxt 在 GC 方面比较特殊:它可能是一个堆分配的 funcval,// 因此 GC 需要跟踪它,但它需要在汇编中设置和清除,// 在那里实现写屏障比较困难。然而,ctxt 实际上是一个保存的、活跃的寄存器,// 我们只在真实寄存器和 gobuf 之间交换它。因此,我们在栈扫描期间将其视为根,// 这意味着保存和恢复它的汇编不需要写屏障。它仍然被类型化为指针,// 以便任何其他从 Go 进行的写操作都会获得写屏障。sp uintptr // 栈指针pc uintptr // 程序计数器g guintptr // 指向当前 goroutine 的指针ctxt unsafe.Pointer // 上下文,用于保存额外的状态或信息ret uintptr // 用于保存函数返回值lr uintptr // 链接寄存器(在某些架构中用于函数调用)bp uintptr // 基指针(在启用帧指针的架构中使用)
}
我们重点需要关注 2 个字段:
sp
:栈指针,表示当前协程运行到栈中的哪个位置了。pc
:程序计数器,表示当前协程运行到哪一行代码了。
1.4 协程状态 atomicstatus
我记得在 Go1.16 版本中,这个字段的类型还是 uint32
:
atomicstatus uint32
现在 Go1.21 版本中,已经用了原子操作来减少并发冲突了:
atomicstatus atomic.Uint32
可以看到 Go 的底层也是随着版本更新不断优化中的。
runtime2.go 定义了 G 的各种状态,如:
_Gidle (0)
: 表示 G 刚刚被分配,尚未初始化。_Grunnable (1)
: 表示 G 在运行队列上。它当前没有执行用户代码。栈不被该goroutine
拥有。- …
后面我们会给出 G 状态的流转图。
1.5 举个例子
假设我们现在有以下 Go 代码:main() 调用 do1(),do1() 调用 do2(),do2() 调用 do3()。
func do3() {fmt.Println("here is do3")
}func do2() {do3() // <---------------
}func do1() {do2()
}func main() {do1()
}
那么当这段程序运行到第 6 行的时候,它的底层结构大概如下图所示:
至于为什么这里有个 goexit()
,其实就是为了可以跳回到 g0
协程,后面我们会具体分析到。
2. P 的底层结构
P 的本质是 runtime2.go 里面定义的 p
结构体:
type p struct {id int32 // P 的唯一标识符status uint32 // P 的状态,如 pidle/prunning/...link puintptr // P 链接schedtick uint32 // 每次调度器调用时递增syscalltick uint32 // 每次系统调用时递增sysmontick sysmontick // sysmon 观察到的最后一个 tickm muintptr // 关联的 M 的反向链接(如果空闲则为 nil)mcache *mcache // M 缓存pcache pageCache // 页面缓存raceprocctx uintptr // 用于竞态检测的上下文// 延迟结构体池deferpool []*_deferdeferpoolbuf [32]*_defer// Goroutine ID 缓存,减少对 runtime·sched.goidgen 的访问goidcache uint64goidcacheend uint64// 可运行 goroutine 队列,无锁访问runqhead uint32runqtail uint32runq [256]guintptrrunnext guintptr // 下一个要运行的 G// 空闲 G 的列表(状态 == Gdead)gFree struct {gListn int32}// sudog 缓存sudogcache []*sudogsudogbuf [128]*sudog// 堆上 mspan 对象的缓存mspancache struct {len intbuf [128]*mspan}// pinner 对象的缓存pinnerCache *pinner// P 状态跟踪trace pTraceState// 每个 P 的持久分配,避免互斥palloc persistentAlloc // 定时器相关字段timer0When atomic.Int64timerModifiedEarliest atomic.Int64timersLock mutextimers []*timernumTimers atomic.Uint32deletedTimers atomic.Uint32timerRaceCtx uintptr// GC 相关字段gcAssistTime int64gcFractionalMarkTime int64gcw gcWorkwbBuf wbBuf// 指示是否在下一个安全点运行特定的函数runSafePointFn uint32// 指示当前 P 是否正在写入任何统计数据。// 偶数时表示没有写入,奇数时表示正在写入。statsSeq atomic.Uint32// 指示当前的 P 应该尽快进入调度器,无论其上运行的是哪个 G。// 这是实现抢占式调度的一部分,允许调度器在必要时中断长时间运行的 goroutine,// 以便其他 goroutine 有机会运行。preempt bool// 记录页面分配、释放和清理跟踪信息的缓冲区。pageTraceBuf pageTraceBuf
}
p
结构体在 Go 语言的运行时系统中代表了一个处理器(processor),它是调度器的核心组成部分。每个 p
负责管理一组 goroutine
的运行。这个结构体包含了许多字段,涉及到 goroutine
的调度、内存分配、垃圾回收和其他系统级别的操作。
我们重点关注以下几个字段:
type p struct {m muintptr // 当前负责的线程// 本地可运行的协程的队列,可无锁访问runqhead uint32 // 队头runqtail uint32 // 队尾runq [256]guintptr // 长度为 256runnext guintptr // 下一个可用的协程的指针// 抢占标识,指示当前的 P 应该尽快进入调度器,无论其上运行的是哪个 G。preempt bool
}
3. Goroutine 的创建
Go 并发能力的优秀之处,就在于它启动一个新的协程实在是太方便了:
go func() { ... }()
那么底层究竟做了什么呢?
3.1 newproc()
Goroutine 通过 proc.go 中的 newproc()
创建:
func newproc(fn *funcval) {gp := getg()pc := getcallerpc()systemstack(func() {newg := newproc1(fn, gp, pc)pp := getg().m.p.ptr()runqput(pp, newg, true)if mainStarted {wakep()}})
}
- 获取当前 goroutine 和调用者 PC:
getg()
获取当前正在执行的goroutine
,getcallerpc()
获取调用者的程序计数器地址。 - 在系统栈上执行
newproc1
:systemstack
确保newproc1
在系统栈上执行,而不是当前goroutine
的栈。这是因为新的goroutine
可能需要更多的栈空间。 - 创建新的 goroutine:
newproc1
被调用来实际创建新的goroutine
。 - 将新的 goroutine 放入运行队列:
runqput
将新创建的goroutine
放入运行队列。 - 唤醒处理器: 如果主函数已经开始执行,
wakep
用于唤醒一个空闲的 P 来运行新的goroutine
。
3.2 newproc1()
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {// ... (省略了错误检查和获取 M 的代码)// 尝试从 P 的空闲列表获取一个 G,如果没有则创建一个新的newg := gfget(pp)if newg == nil {newg = malg(stackMin)casgstatus(newg, _Gidle, _Gdead)allgadd(newg)}// 设置新 G 的栈totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize)totalSize = alignUp(totalSize, sys.StackAlign)sp := newg.stack.hi - totalSizespArg := sp// 清空并设置新 G 的调度器相关字段memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))newg.sched.sp = spnewg.stktopsp = spnewg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantumnewg.sched.g = guintptr(unsafe.Pointer(newg))// 设置新 G 的其他字段gostartcallfn(&newg.sched, fn)newg.parentGoid = callergp.goidnewg.gopc = callerpcnewg.startpc = fn.fn// ... (省略了跟踪和调试相关的代码)casgstatus(newg, _Gdead, _Grunnable)return newg
}
- 创建或获取一个新的 goroutine:
gfget
尝试从 P 的空闲列表中获取一个goroutine
,如果没有可用的,则通过malg
分配一个新的。 - 初始化 goroutine 的栈和调度器: 设置新
goroutine
的栈、程序计数器、调用函数等。这里有个非常核心的点newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum
,我们前面留了个疑问,协程栈顶的goexit
是哪里来的,就是这里来的。这里设置新goroutine
的程序计数器(pc
)指向goexit
函数。goexit
是每个goroutine
在退出时必须调用的函数,用于执行清理工作并切换到 g0 栈。 - 设置父 goroutine ID 和创建点: 记录创建这个新
goroutine
的父goroutine
的 ID 和go
语句的位置。 - 更改 goroutine 状态: 将新
goroutine
的状态从_Gdead
改为_Grunnable
,使其准备好被调度。 - 返回新的 goroutine: 函数返回新创建的
goroutine
。
3.3 runqput()
// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the pp.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(pp *p, gp *g, next bool) {if randomizeScheduler && next && fastrandn(2) == 0 {next = false}if next {retryNext:oldnext := pp.runnextif !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {goto retryNext}if oldnext == 0 {return}gp = oldnext.ptr()}retry:h := atomic.LoadAcq(&pp.runqhead) t := pp.runqtailif t-h < uint32(len(pp.runq)) {pp.runq[t%uint32(len(pp.runq))].set(gp)atomic.StoreRel(&pp.runqtail, t+1) return}if runqputslow(pp, gp, h, t) {return}goto retry
}
- 随机调度器:如果启用了调度器的随机化(
randomizeScheduler
),并且next
为true
(newproc 调用的时候永远都是传的 true),则有一半的概率将next
设置为false
。这有助于防止调度器的行为过于可预测。 - 处理
runnext
槽:如果next
为true
,函数尝试将gp
放入pp.runnext
槽。如果该槽已被占用,则将原有的goroutine
移动到常规运行队列,并重试将新的gp
放入runnext
。 - 放入本地运行队列:如果
next
为false
或runnext
槽已满,函数尝试将gp
放入本地运行队列的尾部。如果队列未满,gp
将被成功添加。 - 处理队列满的情况:如果本地运行队列已满,
runqputslow
被调用,尝试将gp
连同自己队列中一半的 g 放入全局运行队列。如果这也失败了,函数会重试将gp
放入本地队列。 - 原子操作:函数使用原子操作来加载和存储队列头(
runqhead
)和尾(runqtail
)指针,以确保多线程环境下的数据一致性和线程安全。
3.4 runqputslow()
runqputslow
函数处理本地运行队列满的情况,将 goroutine
批量转移到全局队列。这个函数通过原子操作和锁来确保操作的原子性和线程安全。随机化调度器的使用增加了调度的随机性和公平性。
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(pp *p, gp *g, h, t uint32) bool {// 从 pp 的本地队列中获取一半的 goroutinevar batch [len(pp.runq)/2 + 1]*gn := t - hn = n / 2if n != uint32(len(pp.runq)/2) {throw("runqputslow: queue is not full")}for i := uint32(0); i < n; i++ {batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()}if !atomic.CasRel(&pp.runqhead, h, h+n) {return false}batch[n] = gp// 随机打乱 goroutine 的顺序,以增加调度的随机性if randomizeScheduler {for i := uint32(1); i <= n; i++ {j := fastrandn(i + 1)batch[i], batch[j] = batch[j], batch[i]}}// 串成队列for i := uint32(0); i < n; i++ {batch[i].schedlink.set(batch[i+1])}var q gQueueq.head.set(batch[0])q.tail.set(batch[n])// 放入全局队列中lock(&sched.lock)globrunqputbatch(&q, int32(n+1))unlock(&sched.lock)return true
}
- 创建批处理数组:函数首先创建一个
batch
数组,用于存储从本地队列中取出的goroutine
。 - 从本地队列中获取一批
goroutine
:函数计算出要从本地队列中取出多少个goroutine
(通常是队列长度的一半),并将它们添加到batch
数组中。 - 原子操作更新队列头部:使用原子操作
atomic.CasRel
更新本地运行队列的头部索引,这是一个释放(release)操作,确保之前的读取操作完成。 - 将当前
goroutine
添加到批处理中:将传入的gp
添加到batch
数组的末尾。 - 随机化调度器:如果启用了随机调度器,函数会随机打乱
batch
数组中的goroutine
顺序,以增加调度的随机性。 - 链接
goroutine
:将batch
数组中的goroutine
链接起来,形成一个队列。 - 准备全局队列:创建一个
gQueue
结构,并设置其头部和尾部指向batch
数组中的第一个和最后一个goroutine
。 - 将批处理放入全局队列:加锁访问全局调度器的锁,然后将整个
batch
队列放入全局运行队列。
4. 调度过程 schedule()
Go 的调度器核心执行逻辑都在 proc.go 的 schedule()
函数中。我们先不探讨过多的细节,我们先把整个大体脉络理清楚再说。
简化后的 schedule()
如下:
func schedule() {// 获取当前正在执行的 Mmp := getg().m...// 查找一个可运行的 G,会阻塞住直到返回。gp, inheritTime, tryWakeP := findRunnable() ...// 执行 gexecute(gp, inheritTime)
}
execute()
执行 g,它简化后如下:
func execute(gp *g, inheritTime bool) {mp := getg().m..gogo(&gp.sched)
}
它调用了 gogo()
:
func gogo(buf *gobuf)
一般这种格式说明函数是用汇编实现的,我们在 Goland 上可以双击 shift 然后搜索 runtime·gogo
:
不同的平台有不同的实现,但是核心逻辑都是一样的, 它直接操作处理器的寄存器和栈,以实现从一个 goroutine
切换到另一个 goroutine
的功能。
我们后面的发内心会发现 goexit()
最终会调用 schedule()
。
这就串起来了,Go 程序启动后会创建 m0 和 g0,所以第一个schedule()
是 g0 调用的,最后通过 gogo
切换到用户协程 g 上面执行业务方法,完事后 g 通过 goexit
回到 schedule()
,以此循环反复下去。
现在我们可以来总结一下 GPM 调度循环的过程,大概如下图表示:
下面我们再对这个过程中的关键函数进行细致分析:
schedule()
:调度入口。findRunnable()
:寻找可执行的 G。execute()
:执行 G。gogo()
:切换协程栈 g0 到 g。goexit()
:退出 g 协程,切换回 g0 栈。
4.1 schedule()
func schedule() {// 获取当前正在执行的 Mmp := getg().m// 有锁的话抛出异常,避免该情况下调度出现死锁或其他问题if mp.locks != 0 {throw("schedule: holding locks")}// M 被锁定了特定的 G,这个时候直接执行这个锁定的 G。if mp.lockedg != 0 {stoplockedm()execute(mp.lockedg.ptr(), false) // Never returns.}// CGO 调用需要 g0 栈,所以这个时候不继续调度了,抛出异常。if mp.incgo {throw("schedule: in cgo")}top:pp := mp.p.ptr()pp.preempt = false// 安全点检查:如果当前 M 在自旋的话,应该是没有可执行 G 的。if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {throw("schedule: spinning with local work")}// 查找一个可运行的 G,会阻塞住直到返回。gp, inheritTime, tryWakeP := findRunnable() // 调试的时候系统会处于“冻结”状态,// 这里故意通过两次 lock 引入死锁使当前 M 陷入无限等待,// 以在调试时保持当前的调度器和运行时状态不变。if debug.dontfreezetheworld > 0 && freezing.Load() {lock(&deadlock)lock(&deadlock)}// 如果当前 M 之前是自旋的,但是现在要准备执行 G 了,那就不是自旋了。if mp.spinning {resetspinning()}// 当用户级调度被禁用时,采用双重检查后如果确实被禁用了,// 那么就把当前 g 放在 sched.disable.runnable 列表中,// 等待调度重启启用时再处理。// 在 gc 的时候会出现这种情况:// gcStart() -> schedEnableUser(false)// gcMarkDone() -> schedEnableUser(true)if sched.disable.user && !schedEnabled(gp) {lock(&sched.lock)if schedEnabled(gp) {unlock(&sched.lock)} else {sched.disable.runnable.pushBack(gp)sched.disable.n++unlock(&sched.lock)goto top}}// 检查是否需要唤醒一个 P。// 如果返回的 g 比较特殊,比如要负责 gc,那么这个值会是 true。if tryWakeP {wakep()}// 如果 g 已经绑定了 M,则直接启动该 M 去执行 g。if gp.lockedm != 0 {startlockedm(gp)goto top}// 执行 gexecute(gp, inheritTime)
}
schedule()
函数是 Go 调度器的核心,负责管理 goroutine
的执行。它包括多个步骤,如检查当前 M 的状态,处理特殊情况(如 goroutine
被锁定到特定的 M,或者 M 正在执行 CGO 调用),以及选择和执行可运行的 goroutine
。
4.2 findRunnable()
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
// tryWakeP indicates that the returned goroutine is not normal (GC worker, trace reader) so the caller should try to wake a P.
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
通过注释就可以知道这个函数的作用:寻找一个可执行的 goroutine:
- 尝试从其他 P 窃取 g、从本地获取 g、从全局队列获取 g、从网络轮询器获取 g;
- 如果是一个特殊的 g,如要负责 gc 或 trace,那么会将
tryWakeP
置为true
,表示调度器需要尝试唤醒或启动一个新的 P 来运行这个 g,以确保了即使在系统负载较低时,这些特殊的g 也能得到及时处理。
我们只关心它的核心部分:
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {// 获取当前 Mmp := getg().m
top:// 获取 M 绑定的 Ppp := mp.p.ptr()// 1. 每 61 次循环调度,就会去全局队列中获取一个 g 来执行if pp.schedtick%61 == 0 && sched.runqsize > 0 {lock(&sched.lock)gp := globrunqget(pp, 1)unlock(&sched.lock)if gp != nil {return gp, false, false}}// 2. 从本地队列中获取 gif gp, inheritTime := runqget(pp); gp != nil {return gp, inheritTime, false}// 3. 从全局队列中获取 gif sched.runqsize != 0 {lock(&sched.lock)gp := globrunqget(pp, 0)unlock(&sched.lock)if gp != nil {return gp, false, false}}// 4. 从网络轮询器中获取 gif netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {if list := netpoll(0); !list.empty() { // non-blockinggp := list.pop()injectglist(&list)casgstatus(gp, _Gwaiting, _Grunnable)if traceEnabled() {traceGoUnpark(gp, 0)}return gp, false, false}}// 5. 自旋,从其他 P 窃取 g// mp.spinning 这个条件检查当前 M(操作系统线程)是否应该进入自旋状态。// 自旋状态意味着 M 会积极地寻找工作,而不是休眠。// 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load()// 这个条件确保系统中自旋的 M 的数量不会超过一定比例。// 这是为了防止在低并发情况下过多的 CPU 使用。if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {if !mp.spinning {mp.becomeSpinning()}gp, inheritTime, tnow, w, newWork := stealWork(now)if gp != nil {return gp, inheritTime, false}if newWork {goto top}now = tnowif w != 0 && (pollUntil == 0 || w < pollUntil) {pollUntil = w}}...goto top
}
这个过程涉及到几个重要的函数:
globrunqget()
:从全局队列中寻找可运行的 G。runqget()
:从本地队列中寻找可运行的 G。netpoll()
:寻找可以运行的网络协程。stealWork()
:从其他 P 窃取可运行的 G。
4.3 globrunqget()
func globrunqget(pp *p, max int32) *g {// 抢全局列表的锁assertLockHeld(&sched.lock)// 如果为空则直接返回if sched.runqsize == 0 {return nil}// 确定 n 的大小,即要从全局队列中获取的 g 的个数。// 这里会结合入参 max 对边界值进行判断,以获得一个合理的 n。// 一次性最多拿 len(pp.runq)/2 个 g。n := sched.runqsize/gomaxprocs + 1if n > sched.runqsize {n = sched.runqsize}if max > 0 && n > max {n = max}if n > int32(len(pp.runq))/2 {n = int32(len(pp.runq)) / 2}sched.runqsize -= n// 通过 pop() 从全局队列中弹出 ggp := sched.runq.pop()n--for ; n > 0; n-- {gp1 := sched.runq.pop()// 将 g 放入 pp 的本地队列中// runqput 在前面创建协程的地方已经介绍过了,这里不赘述。runqput(pp, gp1, false)}return gp
}
4.4 runqget()
func runqget(pp *p) (gp *g, inheritTime bool) {// runnext 的 g 会优先执行next := pp.runnextif next != 0 && pp.runnext.cas(next, 0) {return next.ptr(), true}for {// 原子操作获取队头指针h := atomic.LoadAcq(&pp.runqhead) t := pp.runqtailif t == h {return nil, false}// 从队头获取 g,并通过原子操作更新队头(即抢这个 g)gp := pp.runq[h%uint32(len(pp.runq))].ptr()if atomic.CasRel(&pp.runqhead, h, h+1) { return gp, false}}
}
runqget()
函数用于从本地运行队列中获取一个可运行的 goroutine
。这个函数只能由拥有该队列的处理器(P)执行。下面是对这个函数的详细解释:
1. 检查 runnext
:
runnext
是一个特殊的字段,用于存储下一个要运行的goroutine
。如果runnext
非零,并且能成功通过原子操作(CAS)将其设置为零,则直接返回这个goroutine
。- 如果成功获取
runnext
指向的goroutine
,inheritTime
被设置为true
,表示这个goroutine
应该继承当前时间片的剩余时间。 - 如果没成功,意味着 runnext 的这个 g 已经被其他 P 给抢了,因为我们可以发现本 P 只可能将其设置为 0,只有其他 P 才会将其设置以为非 0。
2. 从本地队列中获取 goroutine
:
- 使用原子操作加载
runqhead
(队列头指针),runqtail
(队列尾指针)。 - 如果
runqhead
等于runqtail
,表示队列为空,返回nil
。 - 否则,从队列中获取
runqhead
指向的goroutine
,并尝试通过原子操作(CAS)更新runqhead
。 - 如果更新成功,返回获取到的
goroutine
,inheritTime
被设置为false
,表示这个goroutine
应该开始一个新的时间片。
两个问题:
1. 为什么获取 runqhead 需要上锁,获取 runqtail 就不需要?
单一生产者:每个本地运行队列只有一个生产者,即与之关联的当前 P。只有这个 P 可以向队列尾部添加新的 goroutine
。由于不存在多个生产者的并发写入问题,因此不需要锁来保护队尾。
2. inheritTime 有什么用?
inheritTime
的主要作用是决定新调度的 goroutine
是否应该立即开始一个新的时间片,或者继续使用当前时间片的剩余部分。这在以下两种情况下尤为重要:
- 继承时间片 (
inheritTime == true
):当runqget
从runnext
字段获取goroutine
时,这个goroutine
被认为是特别优先的,因此它继承了当前时间片的剩余时间。这通常发生在goroutine
通过特定的同步机制(如通道操作)被明确唤醒时。 - 开始新的时间片 (
inheritTime == false
):当runqget
从本地运行队列中正常获取goroutine
时,这个goroutine
将开始一个全新的时间片。这确保了调度的公平性,使得每个goroutine
都有机会在给定的时间片内运行。
4.5 netpoll()
netpoll()
函数是 Go 语言运行时网络轮询机制的一部分,用于检查网络连接是否准备好进行非阻塞 I/O 操作。这个函数返回一组已经变为可运行状态的 goroutine
,这些 goroutine
之前可能因等待网络 I/O 而被挂起。
这里涉及到 Go 语言网络编程原理,在本文中不细究,就简单带过了。
func netpoll(delay int64) gList {// 检查轮询器是否初始化。if kq == -1 {return gList{}}// 设置轮询超时。var tp *timespecvar ts timespecif delay < 0 {tp = nil} else if delay == 0 {tp = &ts} else {ts.setNsec(delay)if ts.tv_sec > 1e6 {ts.tv_sec = 1e6}tp = &ts}// 使用 kevent 进行轮询操作,结果放在 events 中。var events [64]keventt
retry:n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)if n < 0 {if n != -_EINTR {println("runtime: kevent on fd", kq, "failed with", -n)throw("runtime: netpoll failed")}if delay > 0 {return gList{}}goto retry}// 遍历 events 处理轮询事件。var toRun gListfor i := 0; i < int(n); i++ {ev := &events[i]// netpollBreakRd 用于唤醒轮询,即唤醒等待中的 goroutine。if uintptr(ev.ident) == netpollBreakRd {if ev.filter != _EVFILT_READ {println("runtime: netpoll: break fd ready for", ev.filter)throw("runtime: netpoll: break fd ready for something unexpected")}if delay != 0 {var tmp [16]byteread(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))netpollWakeSig.Store(0)}continue}// 根据轮询事件的类型(读或写),唤醒相应等待网络 I/O 的 groutine。var mode int32switch ev.filter {case _EVFILT_READ:mode += 'r'if ev.flags&_EV_EOF != 0 {mode += 'w'}case _EVFILT_WRITE:mode += 'w'}if mode != 0 {var pd *pollDescvar tag uintptrif goarch.PtrSize == 4 {pd = (*pollDesc)(unsafe.Pointer(ev.udata))tag = 0} else {tp := taggedPointer(uintptr(unsafe.Pointer(ev.udata)))pd = (*pollDesc)(tp.pointer())tag = tp.tag()if pd.fdseq.Load() != tag {continue}}pd.setEventErr(ev.flags == _EV_ERROR, tag)// 标记 goroutine 可执行。netpollready(&toRun, pd, mode)}}// 返回可运行的 goroutine 列表。return toRun
}
4.6 stealWork()
stealWork()
函数用于尝试从其他处理器(P)窃取可运行的 goroutine
或定时器。
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {// 获取当前 M 绑定的 P。pp := getg().m.p.ptr()ranTimer := false// 尝试 4 次。const stealTries = 4for i := 0; i < stealTries; i++ {// 前 3 次尝试窃取 g。// 第 4 次尝试窃取 timer,并且尝试获取其他 P 的 runnext 中的 g。stealTimersOrRunNextG := i == stealTries-1// 随机选一个 P。for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {// 如果系统正在 GC,则可以直接返回 true,因为可能要负责 gc 了,有事干了。if sched.gcwaiting.Load() {return nil, false, now, pollUntil, true}// 获取选中的 P,如果是当前 P 则直接 continue,重试。p2 := allp[enum.position()]if pp == p2 {continue}// 第 4 次尝试去窃取 p2 的 timer。if stealTimersOrRunNextG && timerpMask.read(enum.position()) {// 检查并可能执行 timer。tnow, w, ran := checkTimers(p2, now)now = tnowif w != 0 && (pollUntil == 0 || w < pollUntil) {pollUntil = w}// 如果执行了 timer,则检查本地队列是否有 g 可以运行,// 因为 timer 会唤醒被挂起的 g。if ran {if gp, inheritTime := runqget(pp); gp != nil {return gp, inheritTime, now, pollUntil, ranTimer}ranTimer = true}}// 前 3 次尝试或者第 4 次尝试没有窃取到 timer 的时候,// 就从其他非空闲 P 的本地队列中尝试窃取 g。if !idlepMask.read(enum.position()) {// 如果 stealTimersOrRunNextG 为 true,// 那么会在窃取的时候,尝试窃取 p2 的 runnext。if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {return gp, false, now, pollUntil, ranTimer}}}}return nil, false, now, pollUntil, ranTimer
}
阅读源码的好处这就体现了,所有人都告诉我们 P 找不到可运行 G 的时候就会去窃取其他 P 的 G,但没人告诉我们,在这个过程还可能会去窃取其他 P 的 timer 和 runnext。
所谓 timer
,即定时器,用于在指定的时间后执行某些操作。这些操作通常包括唤醒等待特定时间的 goroutine
,或执行与时间相关的任务。定时器在 Go 的并发模型中扮演着重要的角色,特别是在涉及到时间延迟或周期性任务的场景中。在调度器层面,定时器的管理对于确保及时响应时间相关的事件和维持高效的调度至关重要。通过合理地处理定时器事件,Go 能够在保持高并发性的同时,有效地管理时间延迟和周期性任务。
在 Go 语言的调度器中,跨 P 的定时器窃取是一种优化机制,它有 2 个好处:
- 保持处理器活跃:当一个 P 没有足够的本地工作时,它可以尝试从其他 P 窃取定时器任务。这样做可以保持该 P 活跃,避免它进入休眠状态,从而提高整体系统的效率。
- 平衡系统负载:在多核系统中,不同的 P 可能会有不同的负载。跨 P 的定时器窃取有助于在 P 之间平衡负载,特别是在一些 P 非常忙碌而其他 P 相对空闲的情况下。
好的,回过头来,为什么我们会说窃取的时候会从队头窃取呢?为什么是窃取 p2 一半的 g 呢?这个过程就在 runqsteal()
中:
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {t := pp.runqtail// 从 p2 中获取 n 个 g。n := runqgrab(p2, &pp.runq, t, stealRunNextG)if n == 0 {return nil}// 返回第 1 个 g,因为它可以直接执行了。n--gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr()if n == 0 {return gp}// 如果还有剩下的 g,那么就加入到本地队列中。// 这里可以看到是从队头加入的,所以需要使用原子操作获取队头。h := atomic.LoadAcq(&pp.runqhead)if t-h+n >= uint32(len(pp.runq)) {throw("runqsteal: runq overflow")}atomic.StoreRel(&pp.runqtail, t+n)return gp
}
runqgrab()
是窃取 n 个 g 的过程:
func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {// 使用无限循环来尝试窃取工作,直到成功或确定没有可窃取的工作。for {h := atomic.LoadAcq(&pp.runqhead) t := atomic.LoadAcq(&pp.runqtail) n := t - h// 这里可以看到,要窃取的个数,就是 pp 本地队列中 g 个数的一半n = n - n/2// 如果 n 为 0,且 stealRunNextG == true,// 那么就尝试窃取 pp 的 runnext 中的 g。if n == 0 {if stealRunNextG {if next := pp.runnext; next != 0 {if !pp.runnext.cas(next, 0) {continue}batch[batchHead%uint32(len(batch))] = nextreturn 1}}return 0}// 如果 n 不为队列长度的一半,则说明队列发生了变化,// 这个时候重新尝试窃取。if n > uint32(len(pp.runq)/2) { continue}// 将要窃取的 g 从 pp.runq 中转移到 batch 中。for i := uint32(0); i < n; i++ {g := pp.runq[(h+i)%uint32(len(pp.runq))]batch[(batchHead+i)%uint32(len(batch))] = g}// 使用原子操作尝试更新 pp 的队列头部,即将 g 从 pp.runq 中移除。if atomic.CasRel(&pp.runqhead, h, h+n) { return n}}
}
findRunnable()
的全部过程我们总算是梳理完了,这个过程确实非常精彩,Go 调度器在提高调度性能、确保调度的公平性、平衡系统负载、降低同步开销、减少资源再分配等方面都做了很多的努力,这才让 Go 语言的并发又强大又易用。
下面是对 findRunnable()
一个简单的总结:
4.7 execute()
findRunnable()
之后就是 execute()
,它的核心过程如下(有删减):
func execute(gp *g, inheritTime bool) {mp := getg().m// 将 g0 d 线程信息复制到即将要调用的协程 gp 中。mp.curg = gpgp.m = mp// 修改 gp 的状态为 _Grunning,即运行中。casgstatus(gp, _Grunnable, _Grunning)gp.waitsince = 0// 标记为非抢占gp.preempt = false// 用于栈保护,检测栈溢出gp.stackguard0 = gp.stack.lo + stackGuard// gogo 会完成 g0 到 g 的协程栈的切换,并从 gp.sched 开始执行。// sched 字段我们前面介绍过,它是 gobuf 结构体,存储了 sp 和 pc。gogo(&gp.sched)
}
所以 execute()
的工作非常简单,其实就是将 g0 的线程信息复制到 gp 上,并修改状态和一些元数据,核心部分其实在 gogo()
中。
4.8 gogo()
前面我们说过,gogo()
会完成 g0 栈到 g 栈的切换,且在不同平台下有不同的视线,这里我们以 asm_arm64.s 为代表来看一下 gogo()
的汇编实现:
TEXT runtime·gogo(SB), NOSPLIT|NOFRAME, $0-8MOVD buf+0(FP), R5MOVD gobuf_g(R5), R6MOVD 0(R6), R4 // make sure g != nilB gogo<>(SB)TEXT gogo<>(SB), NOSPLIT|NOFRAME, $0MOVD R6, gBL runtime·save_g(SB)MOVD gobuf_sp(R5), R0MOVD R0, RSPMOVD gobuf_bp(R5), R29MOVD gobuf_lr(R5), LRMOVD gobuf_ret(R5), R0MOVD gobuf_ctxt(R5), R26MOVD $0, gobuf_sp(R5)MOVD $0, gobuf_bp(R5)MOVD $0, gobuf_ret(R5)MOVD $0, gobuf_lr(R5)MOVD $0, gobuf_ctxt(R5)CMP ZR, ZR // set condition codes for == test, needed by stack splitMOVD gobuf_pc(R5), R6B (R6)
具体过程如下:
runtime·gogo
函数:这个函数用于设置新的goroutine
上下文。它接收一个指向gobuf
结构的指针(buf+0(FP)
),该结构包含了goroutine
的上下文信息。- 加载
gobuf
并检查g
:加载gobuf
结构,并检查g
是否为nil
。 - 跳转到
gogo<>
:执行无条件跳转到gogo<>
函数。 gogo<>
函数:这个函数实际上完成了上下文切换。- 设置当前
goroutine
:将R6
寄存器中的值(新的goroutine
)设置为当前goroutine
。 - 保存当前
goroutine
:调用runtime·save_g
保存当前goroutine
的状态。 - 恢复栈指针和其他寄存器:从
gobuf
结构中恢复栈指针(RSP
)、基指针(R29
)、链接寄存器(LR
)、返回值(R0
)和上下文(R26
)。 - 清空
gobuf
结构:将gobuf
结构中的字段清零。 - 准备跳转到新的程序计数器位置:从
gobuf
中加载新的程序计数器地址(gobuf_pc(R5)
)到R6
。 - 跳转执行:通过
B (R6)
跳转到新的程序计数器地址,继续执行新goroutine
的代码。
- 设置当前
这段汇编代码是 Go 运行时中处理 goroutine
上下文切换的关键部分。它直接操作处理器的寄存器和栈,以实现从一个 goroutine
切换到另一个 goroutine
的功能。
在 execute()
中是这么调用 gogo()
的:
gogo(&gp.sched)
所以完成栈的切换后会从 gp.sched
开始,执行代码,前面我们介绍过 sched
是一个 gobuf
结构体:
type gobuf struct {sp uintptrpc uintptrg guintptrctxt unsafe.Pointerret uintptrlr uintptrbp uintptr
}
所以会从 pc 处开始执行业务代码,前面在 newproc()
的时候,我们提过一行代码:
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum
这行代码的作用,是在协程创建的时候插入一个 goexit
函数的地址,因为这个时候 g
刚创建,所以其实就是往协程栈顶插入了 goexit
的地址。所以当 g
执行完业务代码后,当栈中元素不断弹出后,最终就会弹出 goexit
的地址,然后执行 goexit()
函数,退出当前 g
,切换回 g0
。
4.9 goexit()
goexit
定义在 runtime/stubs.go 中:
// goexit is the return stub at the top of every goroutine call stack.
// Each goroutine stack is constructed as if goexit called the
// goroutine's entry point function, so that when the entry point
// function returns, it will return to goexit, which will call goexit1
// to perform the actual exit.
//
// This function must never be called directly. Call goexit1 instead.
// gentraceback assumes that goexit terminates the stack. A direct
// call on the stack will cause gentraceback to stop walking the stack
// prematurely and if there is leftover state it may panic.
func goexit(neverCallThisFunction)
通过注释我们可以得到 2 个信息:
goexit
的位于每个 goroutine 调用栈的顶部。每个 goroutine 的栈被构造得好像goexit
调用了 goroutine 的入口函数。这意味着当入口函数返回时,它实际上返回到goexit
。- 不要直接调用
goexit
,应该调用goexit1
。
goexit1
位于 runtime/proc.go 中:
// Finishes execution of the current goroutine.
func goexit1() {if raceenabled {racegoend()}if traceEnabled() {traceGoEnd()}mcall(goexit0)
}
好吧,它调用了 goexit0
,原来这才是真正的退出入口,它也位于 runtime/proc.go 中:
func goexit0(gp *g) {// 获取当前的 M 和 Pmp := getg().mpp := mp.p.ptr()// 修改 gp 的状态为 _Gdead,标志它的终止casgstatus(gp, _Grunning, _Gdead)// 标记 gc 的栈内存是可以进行 gc 扫描的gcController.addScannableStack(pp, -int64(gp.stack.hi-gp.stack.lo))// 如果 gp 是系统 goroutine,则将系统 goroutine 的计数减少if isSystemGoroutine(gp, false) {sched.ngsys.Add(-1)}// 清理 gp 的状态gp.m = nillocked := gp.lockedm != 0gp.lockedm = 0mp.lockedg = 0gp.preemptStop = falsegp.paniconfault = falsegp._defer = nil gp._panic = nilgp.writebuf = nilgp.waitreason = waitReasonZerogp.param = nilgp.labels = nilgp.timer = nil// 如果启用了垃圾回收(GC)并且 gp.gcAssistBytes 大于 0,// 则将辅助信用归还给全局池。这有助于更好地控制垃圾回收进程。if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {assistWorkPerByte := gcController.assistWorkPerByte.Load()scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes))gcController.bgScanCredit.Add(scanCredit)gp.gcAssistBytes = 0}// 将当前 g 从 P 的运行队列中移除dropg()// WebAssembly 平台特殊处理if GOARCH == "wasm" { gfput(pp, gp)schedule() }// 将 gp 放回处理器的可用队列中,这样可以复用 ggfput(pp, gp)// 如果 goroutine 被绑定到当前线程上,// 那可能是在做系统调用,cgo 调用或其他特殊任务,// 那么就需要切到 g0,让 g0 来完成后面的调度。if locked {// 如果 goroutine 在终止前曾锁定当前线程,// 则根据不同的操作系统执行不同的处理。// 在大多数操作系统上,会跳转到 mstart 函数,释放 P 并退出线程。// 但在 Plan 9 操作系统上,会清除 lockedExt。if GOOS != "plan9" { // See golang.org/issue/22227.gogo(&mp.g0.sched)} else {mp.lockedExt = 0}}// 继续调度// 如果执行了 gogo,那就是 g0 在调度。// 如果没有执行 gogo,那就是 gp 在调度。schedule()
}
到这里我们就完成了对 GPM 调度循环的全过程源码分析了,你可以回到 [4. 调度过程 schedule()](##4. 调度过程 schedule()) 看一下我总结的那张图,这回你应该会有更加深入的理解了。
5. 协程切换
如果要一个协程要一直到执行完毕才退出的话,那很可能会造成其他协程饥饿的问题。所以 Go 其实会在一些特殊的时机对协程进行切换,这个过程有抢占式调度,也有协作式的调度。
协程切换的时候,最核心的就是要保存当前协程的现场,以方便回到该协程的时候继续执行剩下的内容。大致过程如下:
有哪些时机会触发切换呢,这里我直接给出结论:
基于协作的抢占式调度
- 主动挂起:
runtime.gopark()
- 系统调用结束时:
exitsyscall()
- 函数跳转时:
morestack()
基于信号的抢占式调度
- 信号调度:
doSigPreempt()
5.1 主动挂起 runtime.gopack()
这个函数位于 runtime/proc.go 中:
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceReason traceBlockReason, traceskip int) {if reason != waitReasonSleep {checkTimeouts() }mp := acquirem()gp := mp.curgstatus := readgstatus(gp)if status != _Grunning && status != _Gscanrunning {throw("gopark: bad g status")}mp.waitlock = lockmp.waitunlockf = unlockfgp.waitreason = reasonmp.waitTraceBlockReason = traceReasonmp.waitTraceSkip = traceskipreleasem(mp)mcall(park_m)
}
gopark
函数的主要目的是使 G 进入休眠状态,等待被唤醒。
最后它调用了 mcall()
:
// mcall switches from the g to the g0 stack and invokes fn(g)
// mcall 切换到 g0,并执行 fn。
func mcall(fn func(*g))
所以这里切换回 g0
,并执行了 pack_m
:
func park_m(gp *g) {...schedule()
}
pack_m()
其实就是调用了 schedule()
去进行下一轮调度,这就完成了协程的切换。
当协程被阻塞的时候,就会去调用 runtime.gopark()
主动让出 CPU,切回 g0
,等待被唤醒,以此保证最大化利用 CPU 资源。比如以下几种情况:
- 休眠
- channel 通道阻塞
- 网络 I/O 阻塞
- 因为执行垃圾回收而暂停
5.2 系统调用结束时 exitsyscall()
Go 通过 entersyscall()
进行系统调用,完事后会执行 exitsyscall()
,它也位于 runtime/proc.go 中:
func exitsyscall() {...mcall(exitsyscall0)
}
其实它最终也是调用 mcall()
切换到 g0
, 我们不难猜出,它这里让 g0
去执行 exitsyscall0
函数,做完系统调用的善后后,肯定还是会执行 schedule()
函数进行协程调度。
func exitsyscall0(gp *g) {...schedule()
}
5.3 函数跳转时 morestack()
因为函数跳转意味着“压栈”,函数跳转时都会调用这个方法,它的本意在于检查当前协程栈空间是否有足够内存,如果不够就要扩大该栈空间。
为了让每个协程都有执行的机会,并且最大化利用 CPU 资源,Go 语言在初始化时会启动一个特殊的线程来执行系统监控任务,系统监控在一个独立的 M 上运行,不用绑定逻辑处理器 P。当系统监控到协程运行超过 10ms
,就将 g.stackguard0
置为 stackPreempt
(该值是一个抢占标志)。
const forcePreemptNS = 10 * 1000 * 1000 // 10msfunc retake(now int64) uint32 {// 遍历所有的 Pfor i := 0; i < len(allp); i++ {pp := allp[i]pd := &pp.sysmonticks := pp.statussysretake := falseif s == _Prunning || s == _Psyscall {t := int64(pp.schedtick)if int64(pd.schedtick) != t {pd.schedtick = uint32(t)pd.schedwhen = now} else if pd.schedwhen+forcePreemptNS <= now {// 如果 G 运行时间过长,超过了 forcePreemptNS(10ms),// 则标记抢占preemptone(pp)sysretake = true}}if s == _Psyscall {// 如果是系统调用,且已经超过了一个系统监控的 tick(20us),// 则从系统调用中抢占 p。t := int64(pp.syscalltick)if !sysretake && int64(pd.syscalltick) != t {pd.syscalltick = uint32(t)pd.syscallwhen = nowcontinue}...}}
}// 标记抢占
func preemptone(pp *p) bool {mp := pp.m.ptr()gp := mp.curggp.preempt = truegp.stackguard0 = stackPreemptreturn true
}
巧的就是,Go 设计者,让程序在执行 morestack()
函数时顺便判断一下 g
中的 stackguard
是否已经被置为抢占 stackPreempt
,如果的确被标记抢占,就回到 schedule()
方法,并将当前协程放回队列中。
morestack
是汇编实现:
TEXT runtime·morestack(SB),NOSPLIT|NOFRAME,$0-0...BL runtime·newstack(SB)
它最终会调用 newstack()
:
func newstack() {thisg := getg()gp := thisg.m.curg// 1. 判断 gp.stackguard0 是否被标记为抢占stackguard0 := atomic.Loaduintptr(&gp.stackguard0)preempt := stackguard0 == stackPreempt// 2. 如果被标记位抢占,调用 gopreempt_m()if preempt {// 3. 最终会去调用 schedule() 去调新的协程执行gopreempt_m(gp) // never return}
}func gopreempt_m(gp *g) {if traceEnabled() {traceGoPreempt()}goschedImpl(gp)
}func goschedImpl(gp *g) {...schedule()
}
5.4 信号调度 doSigPreempt()
当程序在执行过程中既无法主动挂起,也不能进行系统调用,且无法进行函数调用时,就可以使用信号来调度。
信号其实就是线程信号,在操作系统中有很多基于信号的底层通信方式(SIGPIPE / SIGURG / SIGHUP),而我们的线程可以注册对应信号的处理函数。
Go 中是注册了 SIGURG
信号的处理函数 doSigPreempt()
,在 GC 工作时,向目标线程发送信号。线程收到信号后,会触发调度。
doSigPreempt
位于 runtime.signal_unix.go 中:
func doSigPreempt(gp *g, ctxt *sigctxt) {// 检查此 g 是否要被抢占并且安全抢占if wantAsyncPreempt(gp) {if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {// 2. 调整程序计数器 PC 并异步调用 asyncPreemptctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)}}
}
其中 asyncPreempt
的实现如下:
func asyncPreempt()//
func asyncPreempt2() {gp := getg()gp.asyncSafePoint = true// if gp.preemptStop {mcall(preemptPark)} else {mcall(gopreempt_m)}gp.asyncSafePoint = false
}
asyncPreempt
是汇编实现,最终是调的 asyncPreempt2
,它会调用 mcall
切回 g0
,并执行 preemptPark
或 gopreempt_m
, gopreempt_m
就是前面 morestack
最后调的!不出意外,preemptPack
最后肯定还是调的 schedule()
。
func preemptPark(gp *g) {...schedule()
}
5.5 runtime.Gosched()
在我们实际编程中,你可以通过显式调用 runtime.Gosched()
来主动让出 CPU,促进 Go 的下一轮调度,我们来看它的具体实现,肯定还是调的 schedule()
,没有意外!
func Gosched() {checkTimeouts()mcall(gosched_m)
}func gosched_m(gp *g) {if traceEnabled() {traceGoSched()}goschedImpl(gp)
}func goschedImpl(gp *g) {status := readgstatus(gp)if status&^_Gscan != _Grunning {dumpgstatus(gp)throw("bad g status")}casgstatus(gp, _Grunning, _Grunnable)dropg()lock(&sched.lock)globrunqput(gp)unlock(&sched.lock)schedule()
}
5.6 总结
Go 语言为了确保 P 不会因为 G 运行时间过长或系统调用阻塞时间过长而导致性能下降。它会尝试进行协程切换,以确保任务可以适时地被分配和执行。这有助于保持Go程序的并发性能和响应性。而协程切换的方式有基于协作的抢占式调度(主动挂起 runtime.gopark()
,系统调用结束时exitsyscall()
,函数跳转时 morestack()
),也有基于信号的抢占式调度 doSigPreempt()
,他们都无一例外的最终调用了 schedule()
。
所以总结下来其实还是这张图:
G、P、M 状态流转
经过我们前面的分析,你可以自行整理 G、P、M 状态的流转,这里我给出几张图供你参考:
总结
以上便是对 Go 语言 GPM 模型的全部分享啦!GPM 模型使得 Go 语言能够并发执行成千上万个协程。
为了减少线程“相对昂贵”的切换代价,Go 引入了 GPM,将大量的 Goroutine 分配到少量的系统线程上去执行,并利用多核并行,实现更强大的并发。
为了减小并发冲突,Go 在全局队列的基础上引入了本地队列。
为了避免协程饥饿,Go 又引入了多种协程调度的策略。
为了避免协程阻塞浪费 CPU,Go 引入了多种协程切换的方式。
Go 语言设计者进行了如此复杂的调度器实现,最终交付给 Gopher 的,仅仅是一个 go
关键字这么简单,真的是大道至简,这也是充分印证了那句话:“Go 为并发而生”。
希望本文能对你有所帮助,enjoy~ happy coding~
参考
- 深入理解 Go 语言
- Go 语言底层原理剖析
- 深入 Go 底层原理
- ChatGPT4
作图工具
- excalidraw
- whimsical