golang并发安全-select

前面说了golang的channel, 今天我们看看golang select 是怎么实现的。

数据结构

type scase struct {c    *hchan         // chanelem unsafe.Pointer // 数据
}

select 非默认的case 中都是处理channel 的 接受和发送,所有scase 结构体中c是用来存储select 的case中使用的channel

处理流程

select case 场景

编译器在中间代码生成期间会根据 select 中 case 的不同对控制语句进行优化,这一过程都发生在cmd/compile/internal/walk/select.go 中,下面会根据不同的场景进行分析代码。

没有case

代码示例

func main() {select {}
}

如果是空的select语句,程序会被阻塞,golang 带有死锁监测机制:如果当前写成无法被唤醒,则会panic

源码解读

在runtime/select.go中可以看到:如果cases为空直接调用gopark函数以waitReasonSelectNoCases的原因挂起当前的协程,并且无法被唤醒,golang监测到直接panic。

同样我们在walk/select.go的walkSelectCases函数中可以看到,如果case为空直接调用runtime.block函数

只有一个case

代码示例

func main() {ch := make(chan int)go func() {ch <- 1}()select {case data := <-ch:fmt.Println("ch data:", data)}
}

如果有输入直接打印ch data : 1 , 没有的话会被检测出all goroutines are asleep - deadlock!(和没有case的一样)

源码解读

如果一个非default case ,将读写转换成 ch <- 或 <- ch, 正常的channel读写

func walkSelectCases(cases []*ir.CommClause) []ir.Node {// optimization: one-case select: single op.if ncas == 1 {cas := cases[0] //获取caseir.SetPos(cas)l := cas.Init()if cas.Comm != nil { // 不是默认n := cas.Comm // 获取case的条件语句l = append(l, ir.TakeInit(n)...)switch n.Op() {default:base.Fatalf("select %v", n.Op())case ir.OSEND: // 如果是 send, 无须处理// already okcase ir.OSELRECV2:r := n.(*ir.AssignListStmt)// 如果不是 data, ok := <- ch 类型,处理成<- chif ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {n = r.Rhs[0]break}// 是的话, op设置成data, ok := <- ch形式r.SetOp(ir.OAS2RECV)}l = append(l, n)}// 将case 条件后要执行的语句加入带执行的列表l = append(l, cas.Body...)// 加入 break类型,跳出select-case l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))return l}// convert case value arguments to addresses.// this rewrite is used by both the general code and the next optimization.var dflt *ir.CommClausefor _, cas := range cases {ir.SetPos(cas)n := cas.Commif n == nil {dflt = cascontinue}switch n.Op() {case ir.OSEND:n := n.(*ir.SendStmt)n.Value = typecheck.NodAddr(n.Value)n.Value = typecheck.Expr(n.Value)case ir.OSELRECV2:n := n.(*ir.AssignListStmt)if !ir.IsBlank(n.Lhs[0]) {n.Lhs[0] = typecheck.NodAddr(n.Lhs[0])n.Lhs[0] = typecheck.Expr(n.Lhs[0])}}}
}

两个case(一个default)

代码示例

func main() {ch := make(chan int)select {case data := <-ch:fmt.Println("ch data:", data)default:fmt.Println("default")}
}

如果写入就走<- 读取,反之走默认

源码解读

如果是两个case,其中一个是default,非default的会根据send还是recv 调用channel的selectnbsend和 selectnbrecv。这两个方法是非阻塞的

func walkSelectCases(cases []*ir.CommClause) []ir.Node){// optimization: two-case select but one is default: single non-blocking op.if ncas == 2 && dflt != nil {cas := cases[0]if cas == dflt { // 如果是default 放在 cases[1]cas = cases[1]}n := cas.Commir.SetPos(n)r := ir.NewIfStmt(base.Pos, nil, nil, nil)r.SetInit(cas.Init())var cond ir.Nodeswitch n.Op() {default:base.Fatalf("select %v", n.Op())case ir.OSEND:// 调用selectnbsend(c, v)// if selectnbsend(c, v) { body } else { default body }n := n.(*ir.SendStmt)ch := n.Chancond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)case ir.OSELRECV2:n := n.(*ir.AssignListStmt)recv := n.Rhs[0].(*ir.UnaryExpr)ch := recv.Xelem := n.Lhs[0]if ir.IsBlank(elem) { //空的话 elem= NodNilelem = typecheck.NodNil()}cond = typecheck.Temp(types.Types[types.TBOOL])// 调用 selectnbrecvfn := chanfn("selectnbrecv", 2, ch.Type())call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})r.PtrInit().Append(typecheck.Stmt(as))}r.Cond = typecheck.Expr(cond)r.Body = cas.Bodyr.Else = append(dflt.Init(), dflt.Body...)return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}}    
}

每次尝试从channel读/写值,如果不成功则直接返回,不会阻塞。从selectnbsend和selectnbrecv看出,最后转换成if-else

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {// block:false// chan将 select准换if-elsereturn chansend(c, elem, false, getcallerpc())
}// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selected, ok = selectnbrecv(&v, c); selected {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {// block:false// chan将 select准换if-elsereturn chanrecv(c, elem, false)
}

多个case

代码示例

func main() {ch := make(chan int)go func() {tempArr := []int{1,2,3,4,5,6}for i := range tempArr {ch <- i}}()go func() {for {select {case i := <-ch:println("first: ", i)case i := <-ch:println("second", i)}}}()time.Sleep(3 * time.Second)}

可以看到多个case,会随机选取一个case执行

源码解读

func walkSelectCases(cases []*ir.CommClause) []ir.Node {ncas := len(cases)sellineno := base.Posif dflt != nil {ncas--}// 定义casorder为ncas大小的case语句的数组casorder := make([]*ir.CommClause, ncas)// 分别定义nsends为发送channel的case个数,nrecvs为接收channel的case个数nsends, nrecvs := 0, 0// 多case编译后待执行的语句列表var init []ir.Node// generate sel-structbase.Pos = sellineno// 定义selv为长度为ncas的scase类型的数组// scasetype()函数返回的就是scase结构体,包含c和elem两个字段selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))// No initialization for order; runtime.selectgo is responsible for that.// 定义order为2倍的ncas长度的TUINT16类型的数组// 注意:selv和order作为runtime.selectgo()函数的入参,前者存放scase列表内存地址,后者用来做scase排序使用,排序是为了便于挑选出待执行的caseorder := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))var pc0, pcs ir.Nodeif base.Flag.Race {pcs = typecheck.Temp(types.NewArray(types.Types[types.TUINTPTR], int64(ncas)))pc0 = typecheck.Expr(typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(0))))} else {pc0 = typecheck.NodNil()}// register cases 遍历case生成scase对象放到selv中for _, cas := range cases {ir.SetPos(cas)init = append(init, ir.TakeInit(cas)...)n := cas.Commif n == nil { // default:continue}var i intvar c, elem ir.Nodeswitch n.Op() { // 根据类型获取chan, elem的值default:base.Fatalf("select %v", n.Op())case ir.OSEND: // 发送chan类型,i从0开始递增n := n.(*ir.SendStmt)i = nsendsnsends++c = n.Chanelem = n.Valuecase ir.OSELRECV2: // 接收chan,i从ncas开始递减n := n.(*ir.AssignListStmt)nrecvs++i = ncas - nrecvsrecv := n.Rhs[0].(*ir.UnaryExpr)c = recv.Xelem = n.Lhs[0]}casorder[i] = cas// 定义一个函数,写入c或elem到selv数组setField := func(f string, val ir.Node) {// 放到selv数组r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)// 添加到带执行列表init = append(init, typecheck.Stmt(r))}c = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])setField("c", c)if !ir.IsBlank(elem) {elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])setField("elem", elem)}// TODO(mdempsky): There should be a cleaner way to// handle this.if base.Flag.Race {r := mkcallstmt("selectsetpc", typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(int64(i)))))init = append(init, r)}}// 如果发送chan和接收chan的个数不等于ncas,直接报错if nsends+nrecvs != ncas {base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)}// run the select  开始执行select动作base.Pos = sellineno// 定义chosen, recvOK作为selectgo()函数的两个返回值// chosen 表示被选中的case的索引,recvOK表示对于接收操作,是否成功接收chosen := typecheck.Temp(types.Types[types.TINT])recvOK := typecheck.Temp(types.Types[types.TBOOL])r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)r.Lhs = []ir.Node{chosen, recvOK}// 调用runtime.selectgo()函数作为运行时实际执行多case的select动作的函数fn := typecheck.LookupRuntime("selectgo")var fnInit ir.Nodesr.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}init = append(init, fnInit...)init = append(init, typecheck.Stmt(r))// selv and order are no longer alive after selectgo.// 执行完selectgo()函数后,销毁selv和order数组.init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))if base.Flag.Race {init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, pcs))}// dispatch cases //定义一个函数,根据chosen确定的case分支生成if语句,执行该分支的语句dispatch := func(cond ir.Node, cas *ir.CommClause) {cond = typecheck.Expr(cond)cond = typecheck.DefaultLit(cond, nil)r := ir.NewIfStmt(base.Pos, cond, nil, nil)if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {n := n.(*ir.AssignListStmt)if !ir.IsBlank(n.Lhs[1]) {x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)r.Body.Append(typecheck.Stmt(x))}}r.Body.Append(cas.Body.Take()...)r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))init = append(init, r)}// 如果多case中有default分支,并且chosen小于0,执行该default分支if dflt != nil {ir.SetPos(dflt)dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)}// 如果有chosen选中的case分支,即chosen等于i,则执行该分支for i, cas := range casorder {ir.SetPos(cas)dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)}return init
}

从上面代码可以看出:

1- 初始化过程: 生成scase数组,定义selv 存放scase数组内存地址,定义order 来给scase排序

2- 遍历所有的case ,将case放到带执行列表(不包括default)

3- 调用runtime。selectgo并将selv和order作为入参传入selectgo

4- 根据selectgo返回的chosen来生成if语句,执行对应的case

解锁加锁

加锁的顺序和解锁的顺序相反。


func sellock(scases []scase, lockorder []uint16) {var c *hchanfor _, o := range lockorder {c0 := scases[o].cif c0 != c {c = c0lock(&c.lock)}}
}func selunlock(scases []scase, lockorder []uint16) {// 我们必须非常小心,在解锁最后一把锁后不要触摸sel,因为sel可以在最后一次解锁后立即释放。//考虑以下情况。第一个M调用runtime·park()在runtime·selectgo()中传递sel。//一旦runtime·park()解锁了最后一个锁,另一个M会使调用select的G再次可运行,//并安排其执行。当G在另一个M上运行时,它锁定所有锁并释放sel。现在,如果第一个M触摸sel,它将访问释放的内存。for i := len(lockorder) - 1; i >= 0; i-- {c := scases[lockorder[i]].cif i > 0 && c == scases[lockorder[i-1]].c {continue // will unlock it on the next iteration}unlock(&c.lock)}
}

selectgo

selectgo 处理逻辑

// cas0指向[ncases]scase类型的数组,order0指向[2*ncases]uint16类型的数组(其中ncases必须<=65536)。
// 返回值有两个, chosen 和 recvOK,分别表示选中的case的序号,和对接收操作是否接收成功的布尔值
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {if debugSelect {print("select: cas0=", cas0, "\n")}//==== 执行必要的初始化操作,并生成处理case的两种顺序:轮询顺序polIorder和加锁顺序lockorder。// 为了将scase分配到栈上,这里直接给cas1分配了64KB大小的数组,同理, 给order1分配了128KB大小的数组// NOTE: In order to maintain a lean stack size, the number of scases// is capped at 65536.cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))// ncases个数 = 发送chan个数+ 接收chan个数ncases := nsends + nrecvs// scases是cas1数组的前ncases个元素scases := cas1[:ncases:ncases]// 顺序列表pollorder是order1的0- ncases个元素pollorder := order1[:ncases:ncases]// 加锁列表lockorder是order1的ncase到 2 ncases 个元素lockorder := order1[ncases:][:ncases:ncases]// NOTE: 编译器初始化的pollorder/lockorder的基础数组不是零。// Even when raceenabled is true, there might be select// statements in packages compiled without -race (e.g.,// ensureSigM in runtime/signal_unix.go).var pcs []uintptrif raceenabled && pc0 != nil {pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))pcs = pc1[:ncases:ncases]}casePC := func(casi int) uintptr {if pcs == nil {return 0}return pcs[casi]}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 生成排列顺序norder := 0for i := range scases {cas := &scases[i]// Omit cases without channels from the poll and lock orders.// 处理case中channel为空的情况if cas.c == nil {cas.elem = nil // 便于GCcontinue}// 通过fastrandn函数引入随机性,确定pollorder列表中case的随机顺序索引j := fastrandn(uint32(norder + 1))pollorder[norder] = pollorder[j]pollorder[j] = uint16(i)norder++}// 重新生成列表pollorder = pollorder[:norder]lockorder = lockorder[:norder]// 根据chan地址确定lockorder加锁排序列表的顺序// 简单的堆排序,以保证nlogn时间复杂度完成排序for i := range lockorder {j := i// 从轮询顺序开始,在同一channel上排序。c := scases[pollorder[i]].cfor j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {k := (j - 1) / 2lockorder[j] = lockorder[k]j = k}lockorder[j] = pollorder[i]}for i := len(lockorder) - 1; i >= 0; i-- {o := lockorder[i]c := scases[o].clockorder[i] = lockorder[0]j := 0for {k := j*2 + 1if k >= i {break}if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {k++}if c.sortkey() < scases[lockorder[k]].c.sortkey() {lockorder[j] = lockorder[k]j = kcontinue}break}lockorder[j] = o}if debugSelect {for i := 0; i+1 < len(lockorder); i++ {if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")throw("select: broken sort")}}}// 锁定select中涉及的所有channelsellock(scases, lockorder)var (gp     *gsg     *sudogc      *hchank      *scasesglist *sudogsgnext *sudogqp     unsafe.Pointernextp  **sudog)// === pass 1 - 查找可以等待处理的channelvar casi intvar cas *scasevar caseSuccess boolvar caseReleaseTime int64 = -1var recvOK boolfor _, casei := range pollorder {casi = int(casei) // case的索引cas = &scases[casi]  c = cas.cif casi >= nsends { // 处理接收channel的casesg = c.sendq.dequeue()if sg != nil {// 如果当前channel的sendq上有等待的goroutine,// 跳到recv代码 并从缓冲区读取数据后将等待goroutine中的数据放入到缓冲区中相同的位置goto recv}if c.qcount > 0 {//如果当前channel的缓冲区不为空,就会跳到bufrecv标签处从缓冲区获取数据;goto bufrecv}if c.closed != 0 {//如果当前channel已经被关闭,就会跳到rclose读取末尾数据和收尾工作;goto rclose}} else { // 处理发送channel的caseif raceenabled {racereadpc(c.raceaddr(), casePC(casi), chansendpc)}if c.closed != 0 {// 如果当前channel已经被关闭就会直接跳到sclose标签(panic中止程序)goto sclose}sg = c.recvq.dequeue()if sg != nil {// 如果当前channel的recvq上有等待的goroutine,就会跳到 send标签向channel发送数据;goto send}if c.qcount < c.dataqsiz {// 如果当前channel的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区;goto bufsend}}}if !block { // 如果是非阻塞,即包含default分支,解锁所有channel并返回selunlock(scases, lockorder)casi = -1goto retc}// === pass 2 - 将当前goroutine根据需要挂在chan的sendq或recvq上gp = getg() // 获取当前的groutineif gp.waiting != nil {throw("gp.waiting != nil")}nextp = &gp.waiting // 正在等待的sudog结构;按锁定顺序for _, casei := range lockorder {casi = int(casei)cas = &scases[casi]c = cas.csg := acquireSudog()// 获取sudog,将当前goroutine绑定到sudog上sg.g = gpsg.isSelect = true// 在分配elem和在gp.waiting上排队sg之间没有堆栈分割,copystack可以找到它。sg.elem = cas.elemsg.releasetime = 0if t0 != 0 {sg.releasetime = -1}sg.c = c// 按锁定顺序构建waiting list 。*nextp = sgnextp = &sg.waitlink// 加入相应等待队列if casi < nsends {c.sendq.enqueue(sg)} else {c.recvq.enqueue(sg)}}// 被唤醒后会根据 param 来判断是否是由 close 操作唤醒的,所以先置为 nilgp.param = nil// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(&gp.parkingOnChan, 1)// 挂起当前goroutinegopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)gp.activeStackChans = false// 加锁所有的channelsellock(scases, lockorder)gp.selectDone = 0sg = (*sudog)(gp.param)// param 存放唤醒 goroutine 的 sudog,如果是关闭操作唤醒的,那么就为 nilgp.param = nil// === pass 3 - 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理//dequeue from unsuccessful chans// otherwise they stack up on quiet channels// record the successful case, if any.// We singly-linked up the SudoGs in lock order.// 从不成功的通道中退出队列,否则它们会堆积在安静的通道上,记录成功的案例(如果有的话)。我们单独将SudoG按锁定顺序连接起来。casi = -1cas = nilcaseSuccess = false// 当前goroutine 的 waiting 链表按照lockorder顺序存放着case的sudogsglist = gp.waiting// 在从 gp.waiting 取消case的sudog链接之前清除所有元素,便于GCfor sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {sg1.isSelect = falsesg1.elem = nilsg1.c = nil}// 清楚当前goroutine的waiting链表,因为被sg代表的协程唤醒了gp.waiting = nilfor _, casei := range lockorder {k = &scases[casei]// 如果相等说明,goroutine是被当前case的channel收发操作唤醒的if sg == sglist {// sg唤醒了当前goroutine, 则当前G已经从sg的队列中出队,这里不需要再次出队casi = int(casei)cas = kcaseSuccess = sglist.successif sglist.releasetime > 0 {caseReleaseTime = sglist.releasetime}} else {// 不是此case唤醒当前goroutine, 将goroutine从case对应的队列(发送或接收)出队c = k.cif int(casei) < nsends {c.sendq.dequeueSudoG(sglist)} else {c.recvq.dequeueSudoG(sglist)}}// 释放当前case的sudog,然后处理下一个case的sudogsgnext = sglist.waitlinksglist.waitlink = nilreleaseSudog(sglist)sglist = sgnext}if cas == nil {throw("selectgo: bad wakeup")}c = cas.cif debugSelect {print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")}if casi < nsends {if !caseSuccess {goto sclose}} else {recvOK = caseSuccess}if raceenabled {if casi < nsends {raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)} else if cas.elem != nil {raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)}}if msanenabled {if casi < nsends {msanread(cas.elem, c.elemtype.size)} else if cas.elem != nil {msanwrite(cas.elem, c.elemtype.size)}}if asanenabled {if casi < nsends {asanread(cas.elem, c.elemtype.size)} else if cas.elem != nil {asanwrite(cas.elem, c.elemtype.size)}}selunlock(scases, lockorder)goto retcbufrecv:// 能从buffer获取数据if raceenabled {if cas.elem != nil {raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)}racenotify(c, c.recvx, nil)}if msanenabled && cas.elem != nil {msanwrite(cas.elem, c.elemtype.size)}if asanenabled && cas.elem != nil {asanwrite(cas.elem, c.elemtype.size)}recvOK = trueqp = chanbuf(c, c.recvx)if cas.elem != nil {typedmemmove(c.elemtype, cas.elem, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--selunlock(scases, lockorder)goto retcbufsend:// 发送数据到缓存if raceenabled {racenotify(c, c.sendx, nil)raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)}if msanenabled {msanread(cas.elem, c.elemtype.size)}if asanenabled {asanread(cas.elem, c.elemtype.size)}typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++selunlock(scases, lockorder)goto retcrecv:// 从休眠sender(sg)接收recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)if debugSelect {print("syncrecv: cas0=", cas0, " c=", c, "\n")}recvOK = truegoto retcrclose:// 读取结束的channelselunlock(scases, lockorder)recvOK = falseif cas.elem != nil {typedmemclr(c.elemtype, cas.elem)}if raceenabled {raceacquire(c.raceaddr())}goto retcsend:// 想休眠的接收房发送数据if raceenabled {raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)}if msanenabled {msanread(cas.elem, c.elemtype.size)}if asanenabled {asanread(cas.elem, c.elemtype.size)}send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)if debugSelect {print("syncsend: cas0=", cas0, " c=", c, "\n")}goto retcretc:if caseReleaseTime > 0 {blockevent(caseReleaseTime-t0, 1)}return casi, recvOKsclose:// 向关闭的channel发送数据selunlock(scases, lockorder)panic(plainError("send on closed channel"))
}

总结

简单总结下select对case处理逻辑:

1- 空的case 会被golang监听到无法唤醒的协程,会panic

2- 如果只有一个case, 根据操作类型转换成 <- ch 或 成ch <- () (会跳用channel 的 chansend , chanrecv)

3- 如果一个default 一个非default 的case,非default会走 selectnbsend 和 selectnbrecv 非阻塞的方法(最后转换成if-else 语句)

4- 多个case 的情况下, cmd/compile/internal/walk/select.go 优化程序中:

4.1 对 scase 数组, selv ,order数组初始化,将case放在带执行列表中

4.2 调用selectgo函数,根据返回的chosen 结果来生成if语句,执行对应的case

selectgo 函数:

1- 随机生成一个便利case 的 轮询 poollorder, 根据channel 地址生成一个枷锁顺序的lockorder。(随机顺序保证公平性,加锁顺序能够避免思索)

2- 根据pollorder顺序查找cases是否包含立即处理的chan, 如果有就处理。没有处理的话,创建 sudo 结构,将当前的G 加入各case的channel 对应的 接收发送队列,等待其他G唤醒

3- 当调度器 唤醒当前的G,会按照lockorder ,访问所有的case。从中找到需要处理的case进行读写处理,同时从所有的case 的发送姐搜队列中移除当前的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610601.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端monorepo大仓权限设计的思考与实现

一、背景 前端 monorepo 在试行大仓研发流程过程中&#xff0c;已经包含了多个业务域的应用、共享组件库、工具函数等多种静态资源&#xff0c;在实现包括代码共享、依赖管理的便捷性以及更好的团队协作的时候&#xff0c;也面临大仓代码文件权限的问题。如何让不同业务域的研…

12. SSM整合

1.新建一个maven项目,添加web支持 创建项目 设定项目名 右键添加框架支持: 添加web应用支持: 完成后目录结构: 2.添加jar包依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0…

RealSense Depth Cameras with ROS1 安装和启动教程

首先进入下面的网址&#xff1a; https://dev.intelrealsense.com/docs/ros1-wrapper 进入该链接后&#xff0c;点击最右边的“忍者神龟” 继续点进去 继续点进去后&#xff0c;终于来到了下载安装教程页面&#xff1a; 下面开始命令行代码的搬运&#xff1a; 一、ROS安装&am…

JavaScript高级程序设计读书记录(九):继承

1. 继承 继承是面向对象编程中讨论最多的话题。很多面向对象语言都支持两种继承&#xff1a;接口继承和实现继承。前者只继承方法签名&#xff0c;后者继承实际的方法。接口继承在 ECMAScript 中是不可能的&#xff0c;因为函数没有签名。实现继承是 ECMAScript 唯一支持的继承…

基于ssm的一家运动鞋店的产品推广网站的设计论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本一家运动鞋店就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&am…

MySQL 基于 GTID 主从复制

GTID 定义 GTID 是 MySQL 事务标识&#xff0c;为每一个提交的事务都生成一个标识&#xff0c;并且是全局唯一的&#xff0c;这个特性是从 MySQL5.6 引进的。 组成 GTID 是由 UUID TID&#xff0c;UUID 是MySQL的唯一标识&#xff0c;每个MySQL实例之间都是不同的。TID是代表…

Linux内存管理:(七)页面回收机制

文章说明&#xff1a; Linux内核版本&#xff1a;5.0 架构&#xff1a;ARM64 参考资料及图片来源&#xff1a;《奔跑吧Linux内核》 Linux 5.0内核源码注释仓库地址&#xff1a; zhangzihengya/LinuxSourceCode_v5.0_study (github.com) 1. 触发页面回收 Linux内核中触发页…

Vue3:vue-cli项目创建及vue.config.js配置

一、node.js检测或安装&#xff1a; node -v node.js官方 二、vue-cli安装&#xff1a; npm install -g vue/cli # OR yarn global add vue/cli/*如果安装的时候报错&#xff0c;可以尝试一下方法 删除C:\Users**\AppData\Roaming下的npm和npm-cache文件夹 删除项目下的node…

从传统部署到无服务器计算:AI应用在AWS平台上的革新与飞跃

文章目录 《快速构建AI应用–AWS无服务器AI应用实战》内容简介作者简介目录 随着人工智能技术的不断发展&#xff0c;越来越多的企业开始将人工智能应用于各个业务场景&#xff0c;以提高效率、降低成本并创造新的商业模式。然而&#xff0c;传统的人工智能解决方案往往需要大量…

从零开始C++精讲:第一篇——C++入门

文章目录 前言一、C关键字二、命名空间2.1引子2.2命名空间定义2.3命名空间的使用 三、C输入和输出3.1输出3.2输入 四、缺省参数4.1全缺省4.2半缺省 五、函数重载5.1重载概念 六、引用6.1定义6.2引用的使用示例6.2.1引用作参数6.2.1引用作返回值 6.3传值、传引用效率比较6.4常引…

超维空间M1无人机使用说明书——01、ROS机载电脑使用说明——远程连接

引言&#xff1a;远程连接通常采用两种方式&#xff0c;一种是通过可视化软件&#xff0c;如VNC、Nomachine等&#xff0c;另外一种是使用SSH。各有优缺点&#xff0c;两种远程登录方式的优缺点做一个简单的对比&#xff1a; 1、SSH优缺点 优点:1、消耗网络资源 2、运行稳定 …

前端面试题集合六(高频)

1、vue实现双向数据绑定原理是什么&#xff1f; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>…

java SSM问卷调查系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM问卷调查管理系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代 码和数据库&#xff0c;系统主要采…

每天刷两道题——第十一天

1.1滑动窗口最大值 给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值 。 输入&#xff1a;nums [1,3,-1,-3,5,3,6,7], k 3 输出&…

面试宝典之微服务框架面试题

S1、集群与分布式有啥区别&#xff1f; &#xff08;1&#xff09;相同点&#xff1a; 分布式和集群都是需要有很多节点服务器通过网络协同工作完成整体的任务目标。 &#xff08;2&#xff09;不同点&#xff1a; 分布式是指将业务系统进行拆分&#xff0c;即分布式的每一个…

Pycharm中如何配置python环境(conda)

首先在pycharm中点击 "File" > "Settings" 再次点击如下操作&#xff1a; 点击Python Interpreter的最右侧按钮&#xff0c;点击Show All... 找到python文件 最后点击OK

若依项目的table列表中对每一个字段增加排序按钮(单体版和前后端分离版)

一、目标&#xff1a;每一个字段都添加上下箭头用来排序 只需要更改前端代码&#xff0c;不需要更改后端代码&#xff0c;后面会讲解原理 二、单体版实现方式&#xff1a; 1.在options中添加sortable:true 2.在需要排序的字段中添加sortable:true 三、前后端分离版 1.el-tab…

Open CASCADE学习|非线性方程组

非线性方程组是一组包含非线性数学表达式的方程&#xff0c;即方程中含有未知数的非线性项。解这类方程组通常比解线性方程组更为复杂和困难。 非线性方程组在很多领域都有应用&#xff0c;例如物理学、工程学、经济学等。解决非线性方程组的方法有很多种&#xff0c;包括数值…

面试题-DAG 有向无环图

有向无环图用于解决前后依赖问题&#xff0c;在Apollo中用于各个组件的依赖管理。 在算法面试中&#xff0c;有很多相关题目 比如排课问题&#xff0c;有先修课比如启动问题&#xff0c;需要先启动1&#xff0c;才能启动2 概念 顶点&#xff1a; 图中的一个点&#xff0c;比…

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置)

目录 通过修改保存时间来删除消息★ 删除指定主题的消息演示1、修改kafka检查过期消息的时间间隔2、修改主题下消息的过期时间3、查看修改是否生效4、先查看下主题下有没有消息5、添加几条消息看效果6、查看消息是否被删除 ★ 恢复主题的retention.ms配置1、先查看没修改前的te…