文章目录
- 前言
- 1、线程实现模型
- 1.1、用户级线程与内核级线程
- 1.2、内核级线程模型
- 1.3、用户级线程模型
- 1.3、两级线程模型
- 2、GMP模型
- 2.1、GMP模型概述
- 2.1、GMP v1版本 - GM模型
- 2.2、GMP v2版本 - GMP模型
- 2.3、GMP相关源码
- 2.4 调度流程
- 2.5 设计思想
- 3.总结
前言
并发(并行)一直是在编程领域中一直受关注的一个核心主题。而go语言在诞生之初就带着「 高并发 」的明星光环,进入开发者的视野。而golang的高并发则是由goroutine与GMP调度模型实现的,因此探究goroutine与GMP的实变得尤为重要。
1、线程实现模型
在聊聊goroutine之前,先来聊一下线程的实现模型,因为goroutine的实现也是基于线程的实现模型,而更好的探究goroutine属于哪一种。
简要的来说线程的实现模型总共有一下三种:
- 内核级线程模型
- 用户级线程模型
- 两级线程模型(也称混合型线程模型)
它们之间最大的差异就在于用户线程与内核调度实体(KSE,Kernel Scheduling Entity)之间的对应关系上。而KSE则是内核级线程,是操作系统内核的最小调度单元。 一般所说的协程都是用户级的线程模型,虽然goroutine被称作go语言版本实现的协程,但是它其实是第三种:两级线程模型。
1.1、用户级线程与内核级线程
进程是操作系统中资源分配和拥有的单位,同一个进程内的线程共享进程的资源。线程是处理器调度的基本单位在同一进程中,同一个进程中线程的切换不会引起进程切换。在不同进程中进行线程切换,如从一个进程内的线程切换到另一个进程中的线程时,会引起进程切换。
而根据操作系统内核是否对线程可感知,可以把线程分为内核线程和用户线程:
- 用户级线程: 由应用程序所支持的线程实现, 内核意识不到用户级线程的实现
- 内核级线程: 内核级线程又称为内核支持的线程
1.2、内核级线程模型
在内核级线程模型中,用户线程与内核线程KSE是一对一(1:1)的映射模型。一个用户线程绑定一个实际的内核线程。内核线程建立和销毁都是由操作系统负责、通过系统调用完成的。在内核的支持下运行,无论是用户进程的线程,或者是系统进程的线程,他们的创建、撤销、切换都是依靠内核实现的。所以也因此内核级线程模型的特点是:当某个线程希望创建一个新线程或撤销一个已有线程时,它会进行一个系统调用。
-
Java SE最常用的JVM是Oracle/Sun研发的HotSpot VM,而在这个JVM上基本默认的就是1:1的内核线程模型,除了Solaris上可以支持M:N,以及C++11的 std::thread也是,都是对操作系统的线程的一层封装,创建出来的线程再与每个KSE进行静态绑定。
-
优点: 借助操作系统内核的线程及调度器,实现相对用户线程模型简单,cpu可以快速切换调度线程。在多处理器的情况下多个线程可以同时运行,相较于用户线程模型真正实现了并行处理。
-
缺点: 由于直接借助了操作系统内核来创建、销毁和以及多个线程之间的上下文切换和调度,因此资源成本大幅上涨,对性能影响大。(用户态切换成内核态)。
1.3、用户级线程模型
在用户级线程模型中,用户线程与内核线程KSE是一对一(N:1)的映射模型。多个用户线程的一般从属于单个进程并且多线程的调度是由用户自己的线程库来完成,线程的创建、销毁以及多线程之间的协调等操作都是由用户自己的线程库来负责而无须借助系统调用来实现。一个进程中所有创建的线程都只和同一个 KSE 在运行时动态绑定,也就是说,操作系统只知道用户进程而对其中的线程是无感知的,内核的所有调度都是基于用户进程。
- 很多语言的协程库都是以此模型进行实现的,如Java的Quasar,python的gevent,js的async/await和Promise。
- 优点:由于线程调度是在用户层面完成的,所以对于不需要进行内核态与用户态进行切换。大量减少了上下文的开销,这种实现方式也比内核级线程更加的轻量,对系统资源的消耗会小很多。
- 缺点:不能做到真正意义的并发,一个线程的 I/O 操作会导致整个进程被挂起的主要原因是因为用户线程的调度和管理是由用户空间的库或运行时环境来完成,而不涉及操作系统内核。因此,操作系统并不了解用户线程的存在,也无法对用户线程的 I/O 操作进行直接管理和调度。当一个用户线程发起了一个阻塞式的 I/O 操作(如读取文件、网络通信等),由于用户线程模型的特性,这个线程会被阻塞,无法继续执行。由于操作系统无法感知到这种阻塞,因此整个进程也会被挂起,因为操作系统认为进程中的所有线程都无法继续执行。
1.3、两级线程模型
在两级线程模型中,用户线程与内核线程KSE是多对多(N:M)的映射模型。 吸收了用户级线程与内核级线程的优点,一个进程内的多个线程可以分别绑定一个自己的KSE,这点与内核级线程模型相似,但是用户线程并不与KSE唯一绑定,可以多个用户线程映射到同一个KSE。当某个KSE因为其绑定的线程的阻塞操作被内核调度出CPU时,其关联的进程中其余用户线程可以重新与其他KSE绑定运行。 两级代表的是用户调度器实现用户线程到KSE的调度,而内核调度器实现KSE到cpu上的调度。
因此两级线程模型既不是用户级线程模型完全靠自己调度,也不是内核级线程模型完全靠操作系统调度,而是自身调度与系统调度协同进行。但因为这种模型高度复杂,所以更多作为第三方库进行形式。Go语言中的runtime调度器就是采用的这种实现方案。实现了Goroutine与KSE的动态关联。
2、GMP模型
2.1、GMP模型概述
OS 线程都有一个固定大小的内存块(一般会是 2MB)来做栈,这个栈会用来存储当前正在被调用或挂起(指在调用其它函数时)的函数的内部变量。因为操作系统线程是由操作系统内核管理和调度的,因此需要更多的内存来存储线程的状态信息、调度信息等。而 Go 的 goroutine 是由 Go 运行时环境(runtime)自行管理和调度的,因此2MB的初始内存对goroutine又显得太大,但是对于复杂的递归计算的任务又会显得太小。因此go语言则设计了自己的线程:gorountine。
在 Go 语言中,每一个 goroutine 是一个独立的执行单元,相较于每个 OS 线程固定分配 2M 内存的模式,goroutine 的栈采取了动态扩容方式, 初始时仅为 2KB,随着任务执行按需增长,最大可达 1GB(64 位机器最大是 1G,32 位机器最大是 256M),且完全由 golang 自己的调度器 Go Scheduler 来调度。此外,GC 还会周期性地将不再使用的内存回收,收缩栈空间。 因此,Go 程序可以同时并发成千上万个 goroutine 是得益于它强劲的调度器和高效的内存模型。
在GMP中:
- G: 表示 Goroutine,每个 Goroutine 对应一个 G 结构体,G 存储 Goroutine 的运行堆栈、状态以及任务函数,可重用。G 并非执行体,每个 G 需要绑定到 P 才能被调度执行。
- P: Processor,表示逻辑处理器, 对 G 来说,P 相当于 CPU 核,G 只有绑定到 P(在 P 的 local runq 中)才能被调度。对 M 来说,P 提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等,P 的数量决定了系统内最大可并行的 G 的数量(前提:物理 CPU 核数 >= P 的数量),P 的数量由用户设置的 GOMAXPROCS 决定,但是不论 GOMAXPROCS 设置为多大,P 的数量最大为 256,而默认的为CPU的核数。
- M: Machine,OS 线程抽象,代表着真正执行计算的资源,在绑定有效的 P 后,进入 schedule 循环;而 schedule 循环的机制大致是从 Global 队列、P 的 Local 队列以及 wait 队列中获取 G,切换到 G 的执行栈上并执行 G 的函数,调用 goexit 做清理工作并回到 M,如此反复。M 并不保留 G 状态,这是 G 可以跨 M 调度的基础,M 的数量是不定的,由 Go Runtime 调整,为了防止创建过多 OS 线程导致系统调度不过来,目前默认最大限制为 10000 个。
用户线程最终肯定都是要交由 OS 线程来执行的,goroutine(称为 G)也不例外,但是 G 并不直接绑定 OS 线程运行,而是由 Goroutine Scheduler 中的 P - Logical Processor (逻辑处理器)来作为两者的『中介』,P 可以看作是一个抽象的资源或者一个上下文,一个 P 绑定一个 OS 线程,在 golang 的实现里把 OS 线程抽象成一个数据结构:M,G 实际上是由 M 通过 P 来进行调度运行的,但是在 G 的层面来看,P 提供了 G 运行所需的一切资源和环境,因此在 G 看来 P 就是运行它的 “CPU”,由 G、P、M 这三种由 Go 抽象出来的实现。也因此从这可以看出来GMP模型就是两级线程模型,而p就是内核调度器+用户调度器结合的中间态。
2.1、GMP v1版本 - GM模型
在早期的版本中(1.1前),并不是GMP模型,而是GM模型,GM模型可以看做用户线程模型。
M 想要执行、放回 G 都必须访问全局 G 队列,并且 M 有多个,即多线程访问同一资源需要加锁进行保证互斥 / 同步,所以全局 G 队列是有互斥锁进行保护的。因此GM模型拥有以下几个缺点:
- 单一全局互斥锁(Sched.Lock)和集中状态存储的存在导致所有 goroutine 相关操作,比如:创建、重新调度等都要上锁;
- goroutine 传递问题:M 经常在 M 之间传递『可运行』的 goroutine,这导致调度延迟增大以及额外的性能损耗;
- 每个 M 做内存缓存,导致内存占用过高,数据局部性较差;
- 由于 syscall 调用而形成的剧烈的 worker thread 阻塞和解除阻塞,导致额外的性能损耗。
2.2、GMP v2版本 - GMP模型
因此golang后面基于work-stealing算法,在GM模型的基础上加入了P,分别去协调G与M。
work-stealing调度算法:
- 每个 P 维护一个 G 的本地队列;
- 当一个 G 被创建出来,或者变为可执行状态时,就把他放到 P 的可执行队列中;
- 当一个 G 在 M 里执行结束后,P 会从队列中把该 G 取出;如果此时 P 的队列为空,即没有其他 G 可以执行, P就随机选择另外一个 P,从其可执行的 G 队列中取走一半。
整体的调度模型如下图:
当通过 go 关键字创建一个新的 goroutine 的时候,它会优先被放入 P 的本地队列。为了运行 goroutine,M 需要持有(绑定)一个 P,接着 M 会启动一个 OS 线程,循环从 P 的本地队列里取出一个 goroutine 并执行。当 M 执行完了当前 P 的 Local 队列里的所有 G 后,P的本地队列会从全局队列中取一个出来。而全局队列也空了的时候则会像上文所说的work-stealing算法,P会从别的P的本地队列steal gorountine来进行任务。
P.runnext是什么?
每个P上都有一个runnext字段,类型是guintptr,语义为下一个优先执行的goroutine但是只能存储一个,如果有多个,被抢占掉的goroutine会被放到可执行队列的队尾,继续等待正常调度。
//$GOROOT/src/runtime/proc.go`// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool) {if next {retryNext:oldnext := _p_.runnext// 将 _p_.runnext 的旧值和当前 goroutine 进行交换if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {goto retryNext}// 如果 _p_.runnext 的旧值为空,则直接返回if oldnext == 0 {return}// Kick the old runnext out to the regular run queue.// 如果 _p_.runnext 的旧值不为空,获取其对应的 goroutine 指针,gp 为_p_.runnext 的旧值gp = oldnext.ptr()}
// 如果 next 为 false ,则 gp 仍为新创建的 goroutine
// 如果 next 为 true ,则 gp 为_p_.runnext 的旧值
// 将 gp 放置到 P 本地队列的队尾,如果 P 的本地队列已满,则放置到全局队列上,等待调度器进行调度
retry:h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumerst := _p_.runqtailif t-h < uint32(len(_p_.runq)) {_p_.runq[t%uint32(len(_p_.runq))].set(gp)atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumptionreturn}if runqputslow(_p_, gp, h, t) {return}// the queue is not full, now the put above must succeedgoto retry
}
2.3、GMP相关源码
源码版本为: go sdk-1.21.5
这部分的源码主要涉及三个文件:
- runtime/amd_64.s 涉及到进程启动以及对CPU执行指令进行控制的汇编代码,进程的初始化部分也在这里面
- runtime/runtime2.go 这里主要是运行时中一些重要数据结构的定义,比如g、m、p以及涉及到接口、defer、panic、map、slice等核心类型
- runtime/proc.go 一些核心方法的实现,涉及gmp调度等核心代码在这里
G的结构体:
type g struct {// Stack parameters.// stack describes the actual stack memory: [stack.lo, stack.hi).// stackguard0 is the stack pointer compared in the Go stack growth prologue.// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.// stackguard1 is the stack pointer compared in the //go:systemstack stack growth prologue.// It is stack.lo+StackGuard on g0 and gsignal stacks.// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).// (记录协程栈的栈顶和栈底位置)stack stack // offset known to runtime/cgo// (主要作用是参与一些比较计算,当发现容量要超过栈分配空间后,可以进行扩容或者收缩)stackguard0 uintptr // offset known to liblinkstackguard1 uintptr // offset known to liblink_panic *_panic // 最内层的panic_defer *_defer // 最内层的defer函数m *m // current m; offset known to arm liblink,当前g绑定的m(逻辑处理器)sched gobufsyscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gcsyscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gcstktopsp uintptr // expected sp at top of stack, to check in traceback// 用于做参数传递,睡眠时其他goroutine可以设置param,唤醒时该g可以读取这些param// in four ways:// 1. When a channel operation wakes up a blocked goroutine, it sets param to// point to the sudog of the completed blocking operation.// 2. By gcAssistAlloc1 to signal back to its caller that the goroutine completed// the GC cycle. It is unsafe to do so in any other way, because the goroutine's// stack may have moved in the meantime.// 3. By debugCallWrap to pass parameters to a new goroutine because allocating a// closure in the runtime is forbidden.// 4. When a panic is recovered and control returns to the respective frame,// param may point to a savedOpenDeferState.param unsafe.Pointer// 记录当前gorountine的状态atomicstatus atomic.Uint32stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus// gorountine 唯一的标识goid uint64schedlink guintptrwaitsince int64 // approx time when the g become blockedwaitreason waitReason // if status==Gwaiting// 标记是否可以呗抢占preempt bool // preemption signal, duplicates stackguard0 = stackpreemptpreemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedulepreemptShrink bool // shrink stack at synchronous safe point// asyncSafePoint is set if g is stopped at an asynchronous// safe point. This means there are frames on the stack// without precise pointer information.asyncSafePoint boolpaniconfault bool // panic (instead of crash) on unexpected fault addressgcscandone bool // g has scanned stack; protected by _Gscan bit in statusthrowsplit bool // must not split stack// activeStackChans indicates that there are unlocked channels// pointing into this goroutine's stack. If true, stack// copying needs to acquire channel locks to protect these// areas of the stack.activeStackChans bool// parkingOnChan indicates that the goroutine is about to// park on a chansend or chanrecv. Used to signal an unsafe point// for stack shrinking.parkingOnChan atomic.Bool// inMarkAssist indicates whether the goroutine is in mark assist.// Used by the execution tracer.inMarkAssist boolcoroexit bool // argument to coroswitch_mraceignore int8 // ignore race detection eventsnocgocallback bool // whether disable callback from Ctracking bool // whether we're tracking this G for sched latency statisticstrackingSeq uint8 // used to decide whether to track this GtrackingStamp int64 // timestamp of when the G last started being trackedrunnableTime int64 // the amount of time spent runnable, cleared when running, only used when trackinglockedm muintptrsig uint32writebuf []bytesigcode0 uintptrsigcode1 uintptrsigpc uintptrparentGoid uint64 // goid of goroutine that created this goroutinegopc uintptr // pc of go statement that created this goroutineancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)startpc uintptr // pc of goroutine functionracectx uintptrwaiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock ordercgoCtxt []uintptr // cgo traceback contextlabels unsafe.Pointer // profiler labelstimer *timer // cached timer for time.SleepsleepWhen int64 // when to sleep untilselectDone atomic.Uint32 // are we participating in a select and did someone win the race?coroarg *coro // argument during coroutine transfers// goroutineProfiled indicates the status of this goroutine's stack for the// current in-progress goroutine profilegoroutineProfiled goroutineProfileStateHolder// Per-G tracer state.trace gTraceState// Per-G GC state// gcAssistBytes is this G's GC assist credit in terms of// bytes allocated. If this is positive, then the G has credit// to allocate gcAssistBytes bytes without assisting. If this// is negative, then the G must correct this by performing// scan work. We track this in bytes to make it fast to update// and check for debt in the malloc hot path. The assist ratio// determines how this corresponds to scan work debt.gcAssistBytes int64
}
其中比较关键的字段有:
- stack: 描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi);
- stackguard0: 可以用于调度器抢占式调度;preempt,preemptStop,preemptShrink跟抢占相关;
- _defer 和 _panic: 分别记录这个 G 最内侧的panic和 _defer结构体;
- m: 记录当前 G 占用的线程 M,可能为空;
- atomicstatus: 表示G 的状态;
- sched: 存储 G 的调度相关的数据;
- goid: 表示 G 的 ID,对开发者不可见;
Go语言刻意没有提供goid的原因是为了避免被滥用。因为大部分用户在轻松拿到goid之后,在之后的编程中会不自觉地编写出强依赖goid的代码。强依赖goid将导致这些代码不好移植,同时也会导致并发模型复杂化。同时,Go语言中可能同时存在海量的Goroutine,但是每个Goroutine何时被销毁并不好实时监控,这也会导致依赖goid的资源无法很好地自动回收(需要手工回收)。——《Go语言高级编程》
以及一个相关的字段runtime.gobuf结构体:
type gobuf struct {sp uintptr // 栈指针pc uintptr // 程序计数器,记录G要执行的下一条指令位置g guintptr // 持有 runtime.gobuf 的 Gret uintptr // 系统调用的返回值......
}
而GMP说的G则是来自于这里,但是这里的G的分类却不仅仅是在GMP中,除GMP协程以外还包括了:
- sysmon协程: sysmon协程也是runtime的一部分,sysmon协程直接运行在M不需要P,主要做一些检查工作如:检查死锁、检查计时器获取下一个要被触发的计时任务、检查是否有ready的网络调用以恢复用户G的工作、检查一个G是否运行时间太长进行抢占式调度。
- g0协程:G0 主要负责调度和执行用户级线程,包括创建、销毁和切换用户级线程,以及管理用户级线程的执行。
- 每次启动一个m,都会第一个创建的goroutine,就是g0(g0不是整个进程唯一的,而是一个线程中唯一的;
- g0比较特殊, 每一个m都会有且只有一个自己的g0, g0不指向任何可执行的函数;g0是一个全局的变量,定义在 runtime.g0中,g0分配在系统栈中,通过汇编赋值完成,拥有着比较大的栈空间,而其他的g,则分配在用户栈中,初始化只有2K的栈空间,通过newg函数分配;
- g0仅用于负责调度其他的g(m可能会有很多的g,然后g0用来保持调度栈的信息,当一个m执行从g1切换到g2,首先应该切换到g0,通过g0把g1干掉,把g2加进来 , 即 g0 就是其他g的指挥官,起着桥梁的作用。
创建G的过程:
- 当我们使用go关键字新建一个goroutine时,编译器会编译为runtime中对应的函数调用(newproc,而go 关键字后面的函数成为协程的任务函数),进行创建:
// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {// 获取当前的 goroutine(G)gp := getg()// 获取调用者的程序计数器(program counter)pc := getcallerpc()// 调用 systemstack 函数,该函数用于在系统栈上执行指定的函数。在这里,它用于执行后续的 goroutine 创建和调度操作systemstack(func() {// systemstack 中,调用 newproc1 函数创建一个新的 goroutine,并将其放入等待运行的队列中。newproc1 函数用于执行实际的 goroutine 创建操作newg := newproc1(fn, gp, pc, false, waitReasonZero)// 获取当前的 P(处理器),并调用 runqput 将新创建的 goroutine 放入等待运行的队列中pp := getg().m.p.ptr()runqput(pp, newg, true)// 如果 mainStarted 为真(表示主函数已经开始执行),则调用 wakep 函数唤醒等待的 Pif mainStarted {wakep()}})
}
而其中具体的newproc1的代码流程为:
// Create a new g in state _Grunnable (or _Gwaiting if parked is true), starting at fn.
// callerpc is the address of the go statement that created this. The caller is responsible
// for adding the new g to the scheduler. If parked is true, waitreason must be non-zero.
func newproc1(fn *funcval, callergp *g, callerpc uintptr, parked bool, waitreason waitReason) *g {if fn == nil {fatal("go of nil func value")}// 获取当前的 M(操作系统线程),并将其保存在局部变量 mp 中。这一步禁用抢占,因为在本地变量中持有了 M 和 Pmp := acquirem() // disable preemption because we hold M and P in local vars.// 获取当前的 P(处理器),并尝试从全局的空闲 G 列表中获取一个可用的 G 对象。如果获取不到,//则通过 malg 函数创建一个新的 G 对象,并将其状态设置为 _Gidlepp := mp.p.ptr()newg := gfget(pp)if newg == nil {newg = malg(stackMin)// 初始化完成后设置为 _Gdeadcasgstatus(newg, _Gidle, _Gdead)allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.}// 确保新创建的 G 对象的栈不为空,如果为空则抛出异常if newg.stack.hi == 0 {throw("newproc1: newg missing stack")}if readgstatus(newg) != _Gdead {throw("newproc1: new g is not Gdead")}// 为新创建的 G 对象分配栈空间,并设置栈指针、调用栈帧、函数指针等属性totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frametotalSize = alignUp(totalSize, sys.StackAlign)sp := newg.stack.hi - totalSizeif usesLR {// caller's LR*(*uintptr)(unsafe.Pointer(sp)) = 0prepGoExitFrame(sp)}if GOARCH == "arm64" {// caller's FP*(*uintptr)(unsafe.Pointer(sp - goarch.PtrSize)) = 0}memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))newg.sched.sp = spnewg.stktopsp = spnewg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same functionnewg.sched.g = guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched, fn)// 设置新创建的 G 对象的父 GID、调用者 PC、起始 PC 等属性,并根据是否为系统 goroutine 进行相应的处理newg.parentGoid = callergp.goidnewg.gopc = callerpcnewg.ancestors = saveAncestors(callergp)newg.startpc = fn.fnif isSystemGoroutine(newg, false) {sched.ngsys.Add(1)} else {// Only user goroutines inherit pprof labels.if mp.curg != nil {newg.labels = mp.curg.labels}if goroutineProfile.active {// A concurrent goroutine profile is running. It should include// exactly the set of goroutines that were alive when the goroutine// profiler first stopped the world. That does not include newg, so// mark it as not needing a profile before transitioning it from// _Gdead.newg.goroutineProfiled.Store(goroutineProfileSatisfied)}}// Track initial transition?newg.trackingSeq = uint8(cheaprand())if newg.trackingSeq%gTrackingPeriod == 0 {newg.tracking = true}gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo))// Get a goid and switch to runnable. Make all this atomic to the tracer.trace := traceAcquire()var status uint32 = _Grunnableif parked {status = _Gwaitingnewg.waitreason = waitreason}casgstatus(newg, _Gdead, status)if pp.goidcache == pp.goidcacheend {// Sched.goidgen is the last allocated id,// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].// At startup sched.goidgen=0, so main goroutine receives goid=1.pp.goidcache = sched.goidgen.Add(_GoidCacheBatch)pp.goidcache -= _GoidCacheBatch - 1pp.goidcacheend = pp.goidcache + _GoidCacheBatch}// 分配gidnewg.goid = pp.goidcachepp.goidcache++newg.trace.reset()if trace.ok() {trace.GoCreate(newg, newg.startpc, parked)traceRelease(trace)}// 如果启用了竞争检测,设置竞争检测上下文,并进行相关的处理.if raceenabled {newg.racectx = racegostart(callerpc)newg.raceignore = 0if newg.labels != nil {// See note in proflabel.go on labelSync's role in synchronizing// with the reads in the signal handler.racereleasemergeg(newg, unsafe.Pointer(&labelSync))}}// 释放之前获取的 Mreleasem(mp)return newg
}
具体的流程注释补充到代码中,总结一下:newproc1 函数用于创建一个新的 goroutine,并初始化其相关属性,以便将其添加到调度器中等待执行。这个函数涉及了对 goroutine 的栈空间、调度状态、跟踪信息和竞争检测等方面的处理,确保了新创建的 goroutine 能够被正确地加入到调度器中,并在适当的时机被调度执行。
补充一下代码中出现的几个go的流程状态
- _Gidle:代表协程刚开始创建时的状态,当新创建的协程初始化后,为变为_Gdead状态
- _Gwaiting:代表协程正在运行队列中,等待被运行
- _Grunnable: 表示当前协程在运行时被锁定,陷入阻塞,不能执行用户代码
- _Gsyscall: 当前 G 正在被系统调用;
- _ Grunning:当前 G 正在被运行;
- _Gdead: 初始化后协程的状态/协程被销毁后的状态。
而第一个状态流转到_Gdead的原因是为了以确保新创建的 G 对象处于一个初始的空闲状态,同时避免了在初始化过程中因为状态不一致而导致的问题。这样的状态转换也可以避免在初始化过程中触发 GC 扫描等操作,因为处于 _Gdead 状态的 G 对象不会被 GC 扫描器处理。
- G从创建到结束的生命周期中经历的各种状态变化过程:
M的结构体:
type m struct {// 每个m绑定的一个对应的g0线程,用来调度g0 *g // goroutine with scheduling stackmorebuf gobuf // gobuf arg to morestackdivmod uint32 // div/mod denominator for arm - known to liblink_ uint32 // align next field to 8 bytes// Fields not known to debuggers.procid uint64 // for debuggers, but offset not hard-codedgsignal *g // signal-handling ggoSigStack gsignalStack // Go-allocated signal handling stacksigmask sigset // storage for saved signal mask// 线程的本地存储,可以从任意时刻访问当前线程上的协程g、结构体m、逻辑处理器p等tls [tlsSlots]uintptr // thread-local storage (for x86 extern register)mstartfn func()// 正在m上运行的gcurg *g // current running goroutinecaughtsig guintptr // goroutine running during fatal signal// 当前m上绑定的pp puintptr // attached p for executing go code (nil if not executing go code)nextp puintptroldp puintptr // the p that was attached before executing a syscallid int64mallocing int32throwing throwType// 用于让当前m上的协程继续运行,禁止被抢占,保证curg的运行preemptoff string // if != "", keep curg running on this mlocks int32dying int32profilehz int32// spinning为true代表当前m正在自旋寻找work// 会先检查本地队列,然后全局队列,然后检查network poller尝试执行一些网络 I/O 相关的任务,m则会可能发生阻塞,并尝试gc// 或者偷一点任务,如果都没有则会休眠spinning bool // m is out of work and is actively looking for workblocked bool // m is blocked on a notenewSigstack bool // minit on C thread called sigaltstackprintlock int8incgo bool // m is executing a cgo callisextra bool // m is an extra misExtraInC bool // m is an extra m that is not executing Go codeisExtraInSig bool // m is an extra m in a signal handlerfreeWait atomic.Uint32 // Whether it is safe to free g0 and delete m (one of freeMRef, freeMStack, freeMWait)needextram booltraceback uint8ncgocall uint64 // number of cgo calls in totalncgo int32 // number of cgo calls currently in progresscgoCallersUse atomic.Uint32 // if non-zero, cgoCallers in use temporarilycgoCallers *cgoCallers // cgo traceback if crashing in cgo call// 没有gorountine需要运行,休眠在park上park note// 记录所有工作线程的一个链表alllink *m // on allmschedlink muintptrlockedg guintptrcreatestack [32]uintptr // stack that created this thread, it's used for StackRecord.Stack0, so it must align with it.lockedExt uint32 // tracking for external LockOSThreadlockedInt uint32 // tracking for internal lockOSThreadnextwaitm muintptr // next m waiting for lockmLockProfile mLockProfile // fields relating to runtime.lock contention// wait* are used to carry arguments from gopark into park_m, because// there's no stack to put them on. That is their sole purpose.waitunlockf func(*g, unsafe.Pointer) boolwaitlock unsafe.PointerwaitTraceBlockReason traceBlockReasonwaitTraceSkip intsyscalltick uint32freelink *m // on sched.freemtrace mTraceState// these are here because they are too large to be on the stack// of low-level NOSPLIT functions.libcall libcalllibcallpc uintptr // for cpu profilerlibcallsp uintptrlibcallg guintptrwinsyscall winlibcall // stores syscall parameters on windowsvdsoSP uintptr // SP for traceback while in VDSO call (0 if not in call)vdsoPC uintptr // PC for traceback while in VDSO call// preemptGen counts the number of completed preemption// signals. This is used to detect when a preemption is// requested, but fails.preemptGen atomic.Uint32// Whether this is a pending preemption signal on this M.signalPending atomic.Uint32// pcvalue lookup cachepcvalueCache pcvalueCachedlogPerMmOSchacha8 chacha8rand.Statecheaprand uint64// Up to 10 locks held by this m, maintained by the lock ranking code.locksHeldLen intlocksHeld [10]heldLockInfo
}
主要的有:
- g0: Go 运行时系统在启动之初创建的,用来调度其他 G 到 M 上;
- mstartfn: 表示M的起始函数,go 语句携带的那个函数;
- curg: m 启动时调用的函数,通常是 runtime.startTheWorld 或类似的启动函数,负责初始化新线程(M)并开始执行 Go 代码。
- p: 表示当前 m 绑定的处理器(P)。如果 m 正在执行 Go 代码,这个字段将指向一个有效的 p;否则,如果 m 正在执行系统调用或非 Go 代码,这个字段可能为 nil。
- nextp: 这个字段用于在系统调用期间临时存储 p。当 m 由于系统调用而阻塞时,它的 p 可能会被传递给另一个 m,此时 nextp 会存储这个 p。
- spinning: 表示当前 M 是否正在寻找 G,在寻找过程中 M 处于自旋状态;
- lockedg: 这个字段指向一个与当前 m 锁定的 goroutine。当一个 goroutine 通过运行时系统调用(如锁的获取)与 m 锁定时,lockedg 会指向这个 goroutine。锁定期间,其他 goroutine 无法在该 m 上执行。
跟G一样,m也有另外两类:
- m0: Go程序是一个进程,进程都有一个主线程,m0就是Go程序的主线程,通过一个与其绑定的G0来执行runtime启动加载代码;一个Go程序只有一个m0。
- 运行sysmon的M: 主要用来运行sysmon协程。定义在全局变量runtime.m0中,不需要在heap(堆)上分配;
负责执行初始化操作和启动第一个g( g0);
例如在proc.go文件中就是:
var (m0 mg0 gmcache0 *mcacheraceprocctx0 uintptrraceFiniLock mutex
)
在这里中m0、g0是全局的。但是虽然这里看来g0是全局一个,但是这里的g0并不是抽象的g0,这里的g0仅代表和m0绑定的g0。 我们从创建m的代码看起:
// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//
// id is optional pre-allocated m ID. Omit by passing -1.
//
//go:nowritebarrierrec
func newm(fn func(), pp *p, id int64) {// allocm adds a new M to allm, but they do not start until created by// the OS in newm1 or the template thread.//// doAllThreadsSyscall requires that every M in allm will eventually// start and be signal-able, even with a STW.//// Disable preemption here until we start the thread to ensure that// newm is not preempted between allocm and starting the new thread,// ensuring that anything added to allm is guaranteed to eventually// start.acquirem()// 这里实现m的创建mp := allocm(pp, fn, id)mp.nextp.set(pp)mp.sigmask = initSigmaskif gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {// We're on a locked M or a thread that may have been// started by C. The kernel state of this thread may// be strange (the user may have locked it for that// purpose). We don't want to clone that into another// thread. Instead, ask a known-good thread to create// the thread for us.//// This is disabled on Plan 9. See golang.org/issue/22227.//// TODO: This may be unnecessary on Windows, which// doesn't model thread creation off fork.lock(&newmHandoff.lock)if newmHandoff.haveTemplateThread == 0 {throw("on a locked thread with no template thread")}mp.schedlink = newmHandoff.newmnewmHandoff.newm.set(mp)if newmHandoff.waiting {newmHandoff.waiting = falsenotewakeup(&newmHandoff.wake)}unlock(&newmHandoff.lock)// The M has not started yet, but the template thread does not// participate in STW, so it will always process queued Ms and// it is safe to releasem.releasem(getg().m)return}// 这里分配真正的操作系统newm1(mp)releasem(getg().m)
}
然后看真正的创建m的func:
func allocm(pp *p, fn func(), id int64) *m {// 通过allocmLock.rlock() 获取 allocmLock 的读锁,以确保在创建 M 的过程中不会被其他线程中断。allocmLock.rlock()// The caller owns pp, but we may borrow (i.e., acquirep) it. We must// disable preemption to ensure it is not stolen, which would make the// caller lose ownership.// 通过调用 acquirem() 禁用抢占,以确保当前的 P(处理器)不会被窃取。这是因为在接下来的操作中,可能会临时借用 P 进行内存分配。acquirem()gp := getg()// 如果当前的 G 所属的 M 没有关联的 P,则通过调用 acquirep(pp) 临时借用 P 进行内存分配。if gp.m.p == 0 {acquirep(pp) // temporarily borrow p for mallocs in this function}// Release the free M list. We need to do this somewhere and// this may free up a stack we can use.// 释放空闲 M 列表。这一步可能会释放可用的栈资源。if sched.freem != nil {lock(&sched.lock)var newList *mfor freem := sched.freem; freem != nil; {// Wait for freeWait to indicate that freem's stack is unused.wait := freem.freeWait.Load()if wait == freeMWait {next := freem.freelinkfreem.freelink = newListnewList = freemfreem = nextcontinue}// Drop any remaining trace resources.// Ms can continue to emit events all the way until wait != freeMWait,// so it's only safe to call traceThreadDestroy at this point.if traceEnabled() || traceShuttingDown() {traceThreadDestroy(freem)}// Free the stack if needed. For freeMRef, there is// nothing to do except drop freem from the sched.freem// list.if wait == freeMStack {// stackfree must be on the system stack, but allocm is// reachable off the system stack transitively from// startm.systemstack(func() {stackfree(freem.g0.stack)})}freem = freem.freelink}sched.freem = newListunlock(&sched.lock)}mp := new(m)mp.mstartfn = fnmcommoninit(mp, id)// In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.// Windows and Plan 9 will layout sched stack on OS stack.// 根据操作系统的不同,为新的 M 创建一个初始的 G0(用于执行该 M 上的 goroutines)。如果是 cgo、Solaris、illumos 或 Darwin 系统,会通过 malg(-1) 创建一个栈;否则会通过 malg(16384 * sys.StackGuardMultiplier) 创建一个指定大小的栈。if iscgo || mStackIsSystemAllocated() {mp.g0 = malg(-1)} else {mp.g0 = malg(16384 * sys.StackGuardMultiplier)}mp.g0.m = mp// 如果之前临时借用了 P,则通过 releasep() 归还 Pif pp == gp.m.p.ptr() {releasep()}// 释放当前 G 所属的 M,并释放 allocmLock 的读锁releasem(gp.m)allocmLock.runlock()return mp
}
可以看出在如下代码:
if iscgo || mStackIsSystemAllocated() {mp.g0 = malg(-1)} else {mp.g0 = malg(16384 * sys.StackGuardMultiplier)}
会根据创建操作系统不同创建不同的g0。所以总结m0只有一个,但是g0有很多个,直接与m0绑定,它的内存是操作系统提供的虚拟内存空间中分配的 。当然这从结构体上也可以看出来,m结构体设计中有一个单独的g0。这里也细看了下mac os在源码中分配g0的stack内存大小为:16384byte,也就是16kb。
从m的结构体的spinning也可以看出来m主要就两个状态自旋中,与非自旋中:
- 自旋状态: 表示 M 绑定了 P 又没有获取 G;
- 非自旋状态: 表示正在执行 Go 代码中,或正在进入系统调用,或空闲;
p结构体:
type p struct {// 全局变量allp中的索引位置id int32// p的状态标识status uint32 // one of pidle/prunning/...link puintptr// 用于sysmon协程记录记录被监控的p的系统调用时间和运行时间schedtick uint32 // incremented on every scheduler callsyscalltick uint32 // incremented on every system callsysmontick sysmontick // last tick observed by sysmon// 指向绑定的m,idle状态下为nilm muintptr // back-link to associated m (nil if idle)// 用于分配微小对象和小对象的一个块的缓存空间,含有各种不同等级的spanmcache *mcache// 一个chunk大小(512kb)的内存空间,用来对堆上内存分配的缓存优化,达到无锁缓存pcache pageCacheraceprocctx uintptrdeferpool []*_defer // pool of available defer structs (see panic.go)deferpoolbuf [32]*_defer// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.// 可以分配给g的id的缓存goidcache uint64goidcacheend uint64// 本地可运行的G队列的头部和尾部// Queue of runnable goroutines. Accessed without lock.runqhead uint32runqtail uint32runq [256]guintptr// runnext, if non-nil, is a runnable G that was ready'd by// the current G and should be run next instead of what's in// runq if there's time remaining in the running G's time// slice. It will inherit the time left in the current time// slice. If a set of goroutines is locked in a// communicate-and-wait pattern, this schedules that set as a// unit and eliminates the (potentially large) scheduling// latency that otherwise arises from adding the ready'd// goroutines to the end of the run queue.//// Note that while other P's may atomically CAS this to zero,// only the owner P can CAS it to a valid G.// 下一个待运行的g,这个g的优先级最高// 如果当前g运行完后还有剩余可用时间,那么就会运行runnext的grunnext guintptr// Available G's (status == Gdead)// p上的空闲队列列表gFree struct {gListn int32}sudogcache []*sudogsudogbuf [128]*sudog// Cache of mspan objects from the heap.mspancache struct {// We need an explicit length here because this field is used// in allocation codepaths where write barriers are not allowed,// and eliminating the write barrier/keeping it eliminated from// slice updates is tricky, more so than just managing the length// ourselves.len intbuf [128]*mspan}// Cache of a single pinner object to reduce allocations from repeated// pinner creation.pinnerCache *pinnertrace pTraceStatepalloc persistentAlloc // per-P to avoid mutex// Per-P GC stategcAssistTime int64 // Nanoseconds in assistAllocgcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)// limiterEvent tracks events for the GC CPU limiter.limiterEvent limiterEvent// gcMarkWorkerMode is the mode for the next mark worker to run in.// That is, this is used to communicate with the worker goroutine// selected for immediate execution by// gcController.findRunnableGCWorker. When scheduling other goroutines,// this field must be set to gcMarkWorkerNotWorker.gcMarkWorkerMode gcMarkWorkerMode// gcMarkWorkerStartTime is the nanotime() at which the most recent// mark worker started.gcMarkWorkerStartTime int64// gcw is this P's GC work buffer cache. The work buffer is// filled by write barriers, drained by mutator assists, and// disposed on certain GC state transitions.gcw gcWork// wbBuf is this P's GC write barrier buffer.//// TODO: Consider caching this in the running G.wbBuf wbBufrunSafePointFn uint32 // if 1, run sched.safePointFn at next safe point// statsSeq is a counter indicating whether this P is currently// writing any stats. Its value is even when not, odd when it is.statsSeq atomic.Uint32// Timer heap.timers timers// maxStackScanDelta accumulates the amount of stack space held by// live goroutines (i.e. those eligible for stack scanning).// Flushed to gcController.maxStackScan once maxStackScanSlack// or -maxStackScanSlack is reached.maxStackScanDelta int64// gc-time statistics about current goroutines// Note that this differs from maxStackScan in that this// accumulates the actual stack observed to be used at GC time (hi - sp),// not an instantaneous measure of the total stack size that might need// to be scanned (hi - lo).// 当前goroutines的gc时间统计信息scannedStackSize uint64 // stack size of goroutines scanned by this PscannedStacks uint64 // number of goroutines scanned by this P// preempt is set to indicate that this P should be enter the// scheduler ASAP (regardless of what G is running on it).// 判断是否被抢占preempt bool// gcStopTime is the nanotime timestamp that this P last entered _Pgcstop.gcStopTime int64// pageTraceBuf is a buffer for writing out page allocation/free/scavenge traces.//// Used only if GOEXPERIMENT=pagetrace.pageTraceBuf pageTraceBuf// Padding is no longer needed. False sharing is now not a worry because p is large enough// that its size class is an integer multiple of the cache line size (for any of our architectures).
}
- p的状态
_Pidle: 表示 P 目前没有执行任何用户代码,也没有参与调度工作。它的本地运行队列是空的,没有 goroutine 可以运行。P 在这种状态下等待新的任务或者返回到空闲 P 的集合中。
_Prunning: 表示P 的正常运行状态,表示它当前绑定到一个线程(M),并且正在执行用户代码或者调度器的代码。在这种状态下,P 会积极地从它的本地运行队列中取出 goroutine 并执行它们。
_Psyscall: 当 P 绑定的 M 执行系统调用时,P 会进入这个状态。这意味着尽管 M 正在执行代码,但它并没有执行 Go 语言层面的用户代码,而是在执行操作系统层面的调用(syscall)。
_Pgcstop: 此状态与垃圾回收(GC)相关。当垃圾回收需要协助或者 P 需要在垃圾回收期间暂停时,P 会进入这个状态。在 _Pgcstop 状态下,P 不会执行任何用户代码,直到垃圾回收完成。
_Pdead: 这个状态表示 P 已经不再被调度器使用,并且已经被回收或者移除了。在程序的生命周期中,P 的数量可能会动态变化,当 P 不再需要时,它们会被标记为 _Pdead 并最终被销毁。
它们之间的关系为如下图(来源腾讯技术工程公众号):
- P的创建
P的初始化是在schedinit函数中调用的,schedinit函数是在runtime的汇编启动代码里调用的。
func schedinit() {// 初始化各种锁lockInit(&sched.lock, lockRankSched)lockInit(&sched.sysmonlock, lockRankSysmon)lockInit(&sched.deferlock, lockRankDefer)lockInit(&sched.sudoglock, lockRankSudog)lockInit(&deadlock, lockRankDeadlock)lockInit(&paniclk, lockRankPanic)lockInit(&allglock, lockRankAllg)lockInit(&allpLock, lockRankAllp)lockInit(&reflectOffs.lock, lockRankReflectOffs)lockInit(&finlock, lockRankFin)lockInit(&cpuprof.lock, lockRankCpuprof)allocmLock.init(lockRankAllocmR, lockRankAllocmRInternal, lockRankAllocmW)execLock.init(lockRankExecR, lockRankExecRInternal, lockRankExecW)traceLockInit()// Enforce that this lock is always a leaf lock.// All of this lock's critical sections should be// extremely short.lockInit(&memstats.heapStats.noPLock, lockRankLeafRank)// raceinit must be the first call to race detector.// In particular, it must be done before mallocinit below calls racemapshadow.gp := getg()if raceenabled {gp.racectx, raceprocctx0 = raceinit()}sched.maxmcount = 10000crashFD.Store(^uintptr(0))// The world starts stopped.worldStopped()ticks.init() // run as early as possiblemoduledataverify()stackinit()mallocinit()godebug := getGodebugEarly()initPageTrace(godebug) // must run after mallocinit but before anything allocatescpuinit(godebug) // must run before alginitrandinit() // must run before alginit, mcommoninitalginit() // maps, hash, rand must not be used before this callmcommoninit(gp.m, -1)modulesinit() // provides activeModulestypelinksinit() // uses maps, activeModulesitabsinit() // uses activeModulesstkobjinit() // must run before GC startssigsave(&gp.m.sigmask)initSigmask = gp.m.sigmaskgoargs()goenvs()secure()checkfds()parsedebugvars()gcinit()// Allocate stack space that can be used when crashing due to bad stack// conditions, e.g. morestack on g0.gcrash.stack = stackalloc(16384)gcrash.stackguard0 = gcrash.stack.lo + 1000gcrash.stackguard1 = gcrash.stack.lo + 1000// if disableMemoryProfiling is set, update MemProfileRate to 0 to turn off memprofile.// Note: parsedebugvars may update MemProfileRate, but when disableMemoryProfiling is// set to true by the linker, it means that nothing is consuming the profile, it is// safe to set MemProfileRate to 0.if disableMemoryProfiling {MemProfileRate = 0}lock(&sched.lock)sched.lastpoll.Store(nanotime())procs := ncpuif n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {procs = n}if procresize(procs) != nil {throw("unknown runnable goroutine during bootstrap")}unlock(&sched.lock)// World is effectively started now, as P's can run.worldStarted()if buildVersion == "" {// Condition should never trigger. This code just serves// to ensure runtime·buildVersion is kept in the resulting binary.buildVersion = "unknown"}if len(modinfo) == 1 {// Condition should never trigger. This code just serves// to ensure runtime·modinfo is kept in the resulting binary.modinfo = ""}
}
可以看出具体创建的调度器的个数取决于GOMAXPROCS参数,而如果没有默认的则取决于当初cpu的核数。
procs := ncpuif n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {procs = n}if procresize(procs) != nil {throw("unknown runnable goroutine during bootstrap")}
一般来讲,程序运行时就将GOMAXPROCS大小设置为CPU核数,可让Go程序充分利用CPU。在某些IO密集型的应用
里,这个值可能并不意味着性能最好。 理论上当某个Goroutine进入系统调用时,会有一个新的M被启用或创建,继
续占满CPU。但由于Go调度器检测到M被阻塞是有一定延迟的,也即旧的M被阻塞和新的M得到运行之间是有一定间隔
的,所以在IO密集型应用中不妨把GOMAXPROCS设置的大一些,或许会有好的效果。
然后具体初始化,以及后续p的调整都在对应的procresize这个函数里:
// Change number of processors.
//
// sched.lock must be held, and the world must be stopped.
//
// gcworkbufs must not be being modified by either the GC or the write barrier
// code, so the GC must not be running if the number of Ps actually changes.
//
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {// 为了确保在调整处理器数量时系统调度是安全的assertLockHeld(&sched.lock)assertWorldStopped()// 验证arg 合法性old := gomaxprocsif old < 0 || nprocs <= 0 {throw("procresize: invalid arg")}// 如果开启了trace则记录当前Gomaxprocs的变化trace := traceAcquire()if trace.ok() {trace.Gomaxprocs(nprocs)traceRelease(trace)}// 更新统计数据(处理器数量调整的时间等)now := nanotime()if sched.procresizetime != 0 {sched.totaltime += int64(old) * (now - sched.procresizetime)}sched.procresizetime = nowmaskWords := (nprocs + 31) / 32// 根据新的处理器数量调整全局 allp 数组的大小,这个数组保存了所有处理器(P)的引用if nprocs > int32(len(allp)) {// Synchronize with retake, which could be running// concurrently since it doesn't run on a P.lock(&allpLock)if nprocs <= int32(cap(allp)) {allp = allp[:nprocs]} else {nallp := make([]*p, nprocs)// Copy everything up to allp's cap so we// never lose old allocated Ps.copy(nallp, allp[:cap(allp)])allp = nallp}if maskWords <= int32(cap(idlepMask)) {idlepMask = idlepMask[:maskWords]timerpMask = timerpMask[:maskWords]} else {nidlepMask := make([]uint32, maskWords)// No need to copy beyond len, old Ps are irrelevant.copy(nidlepMask, idlepMask)idlepMask = nidlepMaskntimerpMask := make([]uint32, maskWords)copy(ntimerpMask, timerpMask)timerpMask = ntimerpMask}unlock(&allpLock)}// 初始化pfor i := old; i < nprocs; i++ {pp := allp[i]if pp == nil {pp = new(p)}pp.init(i)atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))}// check当前的goroutine是否需要绑定新的p,还是使用旧的gp := getg()if gp.m.p != 0 && gp.m.p.ptr().id < nprocs {// continue to use the current Pgp.m.p.ptr().status = _Prunninggp.m.p.ptr().mcache.prepareForSweep()} else {// release the current P and acquire allp[0].//// We must do this before destroying our current P// because p.destroy itself has write barriers, so we// need to do that from a valid P.if gp.m.p != 0 {trace := traceAcquire()if trace.ok() {// Pretend that we were descheduled// and then scheduled again to keep// the trace consistent.trace.GoSched()trace.ProcStop(gp.m.p.ptr())traceRelease(trace)}gp.m.p.ptr().m = 0}gp.m.p = 0pp := allp[0]pp.m = 0pp.status = _Pidleacquirep(pp)trace := traceAcquire()if trace.ok() {trace.GoStart()traceRelease(trace)}}// g.m.p is now set, so we no longer need mcache0 for bootstrapping.mcache0 = nil// 释放不在试用的pfor i := nprocs; i < old; i++ {pp := allp[i]pp.destroy()// can't free P itself because it can be referenced by an M in syscall}// 调整 allp 数组大小,如果 allp 数组的大小大于新的处理器数量,将其截断到合适的大小。if int32(len(allp)) != nprocs {lock(&allpLock)allp = allp[:nprocs]idlepMask = idlepMask[:maskWords]timerpMask = timerpMask[:maskWords]unlock(&allpLock)}// 构建一个链表,包含所有处于空闲状态但有本地工作队列的 P,这些 P 需要由调用者调度。var runnablePs *pfor i := nprocs - 1; i >= 0; i-- {pp := allp[i]if gp.m.p.ptr() == pp {continue}pp.status = _Pidleif runqempty(pp) {pidleput(pp, now)} else {pp.m.set(mget())pp.link.set(runnablePs)runnablePs = pp}}stealOrder.reset(uint32(nprocs))var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))if old != nprocs {// Notify the limiter that the amount of procs has changed.gcCPULimiter.resetCapacity(now, nprocs)}return runnablePs
}
然后p会在pp.init(i)进一步的初始化:
// init initializes pp, which may be a freshly allocated p or a
// previously destroyed p, and transitions it to status _Pgcstop.
func (pp *p) init(id int32) {pp.id = idpp.status = _Pgcstop // // 表示这个处理器目前处于垃圾收集停止状态pp.sudogcache = pp.sudogbuf[:0] // 存储等待运行的 goroutine(称为 sudog)的缓存。pp.deferpool = pp.deferpoolbuf[:0] // 存储延迟函数调用(defer)的池pp.wbBuf.reset() // 用于跟踪在并发执行期间对内存的写入操作/**为 p 分配或关联一个 mcache(内存缓存),mcache 是用来缓存分配的内存块以加速内存分配过程的。如果 id 为 0,使用一个特殊的 mcache 实例 mcache0,这是启动时用于引导的内存缓存。否则,为这个 p 分配一个新的 mcache。*/if pp.mcache == nil {if id == 0 {if mcache0 == nil {throw("missing mcache?")}// Use the bootstrap mcache0. Only one P will get// mcache0: the one with ID 0.pp.mcache = mcache0} else {pp.mcache = allocmcache()}}// 关联race上下文if raceenabled && pp.raceprocctx == 0 {if id == 0 {pp.raceprocctx = raceprocctx0raceprocctx0 = 0 // bootstrap} else {pp.raceprocctx = raceproccreate()}}lockInit(&pp.timers.mu, lockRankTimers)// This P may get timers when it starts running. Set the mask here// since the P may not go through pidleget (notably P 0 on startup).timerpMask.set(id)// Similarly, we may not go through pidleget before this P starts// running if it is P 0 on startup.idlepMask.clear(id)
}
所以从代码层面可以再总结下:
-
g (goroutine): 表示一个 Go 协程,是 Go 并发的执行单位。每个 g 代表一个可运行的程序计数器(pc)和栈,它包含了执行 Go 代码所需的状态。g 可以被阻塞(如等待 channel 操作)、运行或处于其他状态。
-
m (machine): 表示一个操作系统线程,是实际执行 g 的载体。每个 m 都有一个 g0 协程,这个 g0 用于调度,也就是说,当 m 执行调度操作时(如创建新的 g 或者进行垃圾回收),它使用的是 g0。m 负责执行 g 的代码,并且管理与操作系统线程相关的操作。
-
p (processor): 是 Go 运行时中的逻辑处理器,它作为 m 和 g 之间的桥梁。每个 p 都有一个运行队列(run queue),包含多个 g,m 可以执行绑定在 p 上的 g。p 还管理着内存分配,每个 p 都有一个本地的 mcache,用于优化内存分配。
三者关系如下:
- m 与 g 的关系: 一个 m 可以执行多个 g,但它在任一时刻只能执行一个 g。m 通过其 curg 字段跟踪当前正在执行的 g。
- m 与 p 的关系: 一个 m 可以绑定到一个 p 上,以执行该 p 上的 g。m 通过 p 字段来引用当前绑定的 p。
- p 与 g 的关系: 一个 p 包含一个运行队列,其中包含多个 g。p 负责调度这些 g 的执行,并且通过 runq(一个固定大小的数组)来管理它们。
P与M创建的实际
-
创建 P(Processor):
-
程序启动时: 在 Go 程序启动时,运行时系统会根据 GOMAXPROCS 环境变量的值或通过 runtime.GOMAXPROCS 函数设置的值来创建相应数量的 P。GOMAXPROCS 决定了程序运行时最多可以使用的处理器数量,从而影响并行执行的 goroutine 的数量。没有参数则通过cpu数量设定默认的值。
-
运行时调整: 虽然 P 的数量通常在程序启动时确定,但可以通过改变 runtime.GOMAXPROCS 的值来动态调整。这会导致运行时系统根据新的 GOMAXPROCS 值增加或减少 P 的数量。
-
-
创建 M (Machine, 即 OS Thread):
-
需要执行 Go 代码时: 当系统中没有足够的 M 来关联 P 并执行其中的可运行的 G(goroutine)时,会创建新的 M。例如,当所有的 M 都阻塞在系统调用上,而 P 中还有许多可运行的 G 时,系统会寻找空闲的 M,如果没有找到,就会创建新的 M。
-
M 阻塞时: 如果一个 M 在执行系统调用时阻塞,它可能不会立即创建新的 M,而是尝试将当前 P 转移给其他空闲的 M。如果找不到空闲的 M,则会创建新的 M。
-
初始化和辅助 M: 程序启动时会创建一个特殊的 M(通常称为 M0),用于执行程序的初始化和启动工作。此外,还有一些辅助 M,如运行时系统内部的系统监控器(sysmon)和垃圾回收标记辅助器(gcMarkWorker),它们在需要时会被创建。
-
垃圾回收: 在执行垃圾回收时,可能需要额外的 M 来帮助完成标记和清扫工作。
-
2.4 调度流程
从总的调度流程上可以参考刘丹冰老师的《Golang的协程调度器原理及GMP设计思想》的图:
- 从图中可以得出开始一个go func的流程为:
- 通过 go func()来创建⼀个goroutine;
- 有两个存储G的队列,⼀个是局部调度器P的本地队列、⼀个是全局G队列。新创建的G会先
保存在P的本地队列中,如果P的本地队列已经满了就会保存在全局的队列中;- G只能运⾏在M中,⼀个M必须持有⼀个P,M与P是1:1的关系。M会从P的本地队列弹出⼀个可执
⾏状态的G来执⾏,如果P的本地队列为空,就会想其他的MP组合偷取⼀个可执⾏的G来执⾏;- ⼀个M调度G执⾏的过程是⼀个循环机制;
- 当M执⾏某⼀个G时候如果发⽣了syscall或则其余阻塞操作,M会阻塞,如果当前有⼀些G在执⾏,
runtime会把这个线程M从P中摘除(detach),然后再创建⼀个新的操作系统的线程(如果有空闲的线程可
⽤就复⽤空闲线程)来服务于这个P;- 当M系统调⽤结束时候,这个G会尝试获取⼀个空闲的P执⾏,并放⼊到这个P的本地队列。如果获取不
到P,那么这个线程M变成休眠状态, 加⼊到空闲线程中,然后这个G会被放⼊全局队列中。
- 调度的生命周期:
2.5 设计思想
复用线程: 避免频繁的创建、销毁线程,减少这部分的开销。
- work stealing机制:Go 调度器中的 work-stealing(工作窃取)机制是一种用于提高并发性能的策略,它允许一个处理器(P)在没有可运行的 goroutine 时,从其他处理器的本地队列中窃取(steal)一些 goroutine 来执行。这种机制有助于实现负载均衡,避免某些处理器过载而其他处理器空闲的情况。 主要体现在调度器的 runqget 和 runqsteal 函数中。
// 从本地可运行队列中获取 goroutine。
// 如果 inheritTime 为 true,则 gp 应继承当前时间片的剩余时间。
// 否则,它应该开始一个新的时间片。
// 仅由当前 P 的所有者执行。
func runqget(pp *p) (gp *g, inheritTime bool) {// 如果存在 runnext,则它是下一个要运行的 G。next := pp.runnext// 如果 runnext 非零并且 CAS 失败,那么它只能是被另一个 P 偷走了。if next != 0 && pp.runnext.cas(next, 0) {return next.ptr(), true // 返回 runnext G 并继承剩余时间片}for {h := atomic.LoadAcq(&pp.runqhead) // 原子加载头部索引t := pp.runqtail// 如果头部和尾部相等,队列为空if t == h {return nil, false}// 从本地队列头部取出 Ggp := pp.runq[h%uint32(len(pp.runq))].ptr()// 尝试原子地将头部索引向后移动if atomic.CasRel(&pp.runqhead, h, h+1) {return gp, false // 返回取出的 G 并开始新的时间片}}
}
// 从 p2 的本地可运行队列中偷取一半的 goroutine,
// 并将它们放入 p 的本地可运行队列。
// 返回偷取的其中一个 goroutine(如果失败则返回 nil)。
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {t := pp.runqtail// 从 p2 偷取 goroutine 并放入 pp 的本地队列n := runqgrab(p2, &pp.runq, t, stealRunNextG)if n == 0 {return nil}n--// 获取偷取的 goroutinegp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr()if n == 0 {return gp}// 原子加载 p 的本地队列头部索引h := atomic.LoadAcq(&pp.runqhead)// 检查是否溢出if t-h+n >= uint32(len(pp.runq)) {throw("runqsteal: runq overflow")}// 原子存储新的尾部索引,使偷取的 goroutine 可供消费atomic.StoreRel(&pp.runqtail, t+n)return gp
}// runqgrab 从给定处理器 pp 的本地可运行队列中批量获取(或“窃取”)goroutine。
// batch 是一个环形缓冲区,起始于 batchHead。
// 返回获取到的 goroutine 数量。
// 可以由任何 P 执行。
func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {for {// 使用 load-acquire 操作加载 pp 的 runq 队列头部索引,与其他消费者同步。h := atomic.LoadAcq(&pp.runqhead)// 使用 load-acquire 操作加载 pp 的 runq 队列尾部索引,与生产者同步。t := atomic.LoadAcq(&pp.runqtail)// 计算可偷取的 goroutine 数量。n := t - h// 减半以确保不会取走所有 goroutine,留一半给原处理器。n = n - n/2// 如果没有可偷取的 goroutine 或者 stealRunNextG 为真且 pp.runnext 有值。if n == 0 {if stealRunNextG {// 尝试从 pp.runnext 偷取 goroutine。if next := pp.runnext; next != 0 {if pp.status == _Prunning {// 如果 pp 正在运行,则睡眠一小会,确保 pp 不会立即运行我们即将偷取的 goroutine。// 这种后退策略可以减少在不同 Ps 之间频繁交换 goroutine 的情况。if !osHasLowResTimer {usleep(3) // 如果系统不支持低分辨率定时器,则睡眠 3 微秒。} else {osyield() // 否则,进行线程让步(yield),放弃 CPU。}}// 使用 CAS 尝试将 pp.runnext 设置为 0,如果成功则表示偷取成功。if !pp.runnext.cas(next, 0) {continue}// 将偷取的 goroutine 放入 batch 缓冲区。batch[batchHead%uint32(len(batch))] = nextreturn 1 // 返回偷取的 goroutine 数量。}}return 0 // 如果没有可偷取的 goroutine,返回 0。}// 如果 h 和 t 读取不一致,说明队列状态可能已经改变,继续循环。if n > uint32(len(pp.runq)/2) {continue}// 将 pp 的 runq 队列中的 goroutine 复制到 batch 缓冲区。for i := uint32(0); i < n; i++ {g := pp.runq[(h+i)%uint32(len(pp.runq))]batch[(batchHead+i)%uint32(len(batch))] = g}// 使用 cas-release 操作尝试提交这次获取,如果成功则返回获取的 goroutine 数量。if atomic.CasRel(&pp.runqhead, h, h+n) {return n}}
}
- Hand off(传递)机制是 Go 调度器中用于处理线程(M)阻塞时的一种策略。当一个线程因为系统调用或其他原因阻塞时,它不会让绑定的处理器(P)空闲,而是将当前的 P 传递给另一个线程,以便新线程可以继续执行 P 上的 goroutine。 这有助于减少因线程阻塞导致的上下文切换开销,并保持程序的并发性。
1. 线程阻塞: 线程 M 在执行系统调用或其他阻塞操作时被阻塞。
2. 寻找新线程: 调度器寻找一个新线程 M’,它可能是一个刚刚完成工作的线程,或者是新创建的线程。
3. 传递 P: 阻塞的线程 M 将其绑定的 P 传递给新线程 M’。
4. 新线程执行: 新线程 M’ 开始执行 P 上的 goroutine,而阻塞的线程 M 等待其阻塞操作完成。
5. 返回执行: 一旦阻塞操作完成,线程 M 可以重新获取一个 P 并继续执行 goroutine。
- 利⽤并⾏ :GOMAXPROCS设置P的数量,最多有GOMAXPROCS个线程分布在多个CPU上同时运⾏。
- 全局 G 队列:在新的调度器中依然有全局 G 队列,但功能已经被弱化了,当 M 执行 work stealing 从其他 P 偷不到 G 时,它可以从全局 G 队列获取 G。
- 抢占调度: 在coroutine中要等待⼀个协程主动让出CPU才执⾏下⼀个协程也就是协调调度,在Go中,抢占调度是一种确保公平性的机制,它允许长时间运行的 goroutine 被中断,从而给其他 goroutine 运行的机会。 在go1.14 版本引入了基于系统信号的异步抢占调度(在此之前也是协调),使得即使在没有函数调用的循环中,goroutine 也可以被抢占。这种机制是通过在运行时的每个函数的序言(function prologue)中插入检查代码来实现的,以便在适当的时候进行抢占。
// 信号处理函数,用于执行抢占
func doSigPreempt(gp *g, ctxt *sigctxt) {...if wantAsyncPreempt(gp) {if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {ctxt.pushCall(funcPC(asyncPreempt), newpc)}}...
}
具体的测试可以通过以下的例子然后通过trace追踪测试下,这里不再赘述。
package mainimport ("fmt""os""runtime""runtime/trace""sync"
)func main() {runtime.GOMAXPROCS(1)f, _ := os.Create("trace.output")defer f.Close()_ = trace.Start(f)defer trace.Stop()var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func() {defer wg.Done()t := 0for i:=0;i<1e8;i++ {t+=1}fmt.Println("total:", t)}()}wg.Wait()
}
3.总结
- 为了解决 Go 早期(v1.1前)多线程 M 对应多协程 G 调度器的全局锁、中心化状态带来的锁竞争导致的性能下降等问题,Go 开发者引入了处理器 P 结构,形成了当前经典的 GMP 调度模型;
- Go的调度机制: 本质上是两级线程模型,为了运行时在用户态提供的多个函数组成的一种机制,目的是高效地调度 G 到 M上去执行。
- G 在运行时中的状态可以简化成三种:等待中_Gwaiting、可运行_Grunnable、运行中_Grunning,运行期间大部分情况是在这三种状态间来回切换;
- M 的状态可以简化为只有两种:自旋和非自旋; 自旋状态,表示 M 绑定了 P 又没有获取 G;非自旋状态,表示正在执行 Go 代码中,或正在进入系统调用,或空闲;
- P的状态可以简化为:空闲中 _Pidle、运行中_Prunning、系统调用中 _Psyscall、GC中_Pgcstop、死亡 _Pdead ;
- Go调度的核心思想: 尽可能的确保线程的复用(work Stealing、Hand off机制)、给予协作的抢占机制、利用多核并行能力,限制同时运行(不包含阻塞)的 M 线程数 等于 CPU 的核心数目。
- 调度器的启动逻辑是: 初始化 g0 和 m0,并将二者互相绑定, m0 是程序启动后的初始线程,g0 是 m0 线程的系统栈代表的 G 结构体,负责普通 G 在 M 上的调度切换 --> runtime.schedinit():负责M、P 的初始化过程,分别调用runtime.mcommoninit() 初始化 M 的全局队列allm 、调用 runtime.procresize() 初始化全局 P 队列 allp --> runtime.newproc():负责获取空闲的 G 或创建新的 G --> runtime.mstart() 启动调度循环;
- 调度器的循环逻辑是: 运行函数 schedule() --> 通过 runtime.globrunqget() 从全局队列、通过 runtime.runqget() 从 P 本地队列、 runtime.findrunnable 从各个地方,获取一个可执行的 G --> 调用 runtime.execute() 执行 G --> 调用 runtime.gogo() 在汇编代码层面上真正执行G --> 调用 runtime.goexit0() 执行 G 的清理工作,重新将 G 加入 P 的空闲队列 --> 调用 runtime.schedule() 进入下一次调度循环。
相关参考:
- 《Golang的协程调度器原理及GMP设计思想》
- 《深入Go语言之旅》
- 《goland专家编程》
- 《Go语言高级编程》
- https://strikefreedom.top/archives/high-performance-implementation-of-goroutine-pool#toc-head-10
- https://zhuanlan.zhihu.com/p/586236582
- https://www.cnblogs.com/dojo-lzz/p/16342622.html
https://blog.leonard.wang/archives/golangprunnext%E6%98%AF%E4%BB%80%E4%B9%88- https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/GMP%E5%8E%9F%E7%90%86%E4%B8%8E%E8%B0%83%E5%BA%A6.html
- https://blog.csdn.net/apple_56973763/article/details/127045808