Go-知识select

Go-知识select

  • 1. select 的特性
    • 1.1 chan读写
    • 1.2 返回值
    • 1.3 default
  • 2. select 经典使用
    • 2.1 永久阻塞
    • 2.2 快速检错
    • 2.3 限时等待
  • 3. 实现原理
    • 3.1 数据结构
    • 3.2 实现逻辑
    • 3.3 原理总结
  • 4. 总结
    • 4.1 大概原理
    • 4.2 参数
    • 4.3 返回值

一个小活动: https://developer.aliyun.com//topic/lingma/activities/202403?taskCode=14508&recordId=40dcecb786f9a65c2e83e95306822ce4#/?utm_content=m_fission_1 「通义灵码 · 体验 AI 编码,开 AI 盲盒」

githubio地址:https://a18792721831.github.io/

select 是Go在语言层面提供的多路I/O复用机制,用于检测多个chan是否就绪。
建议先查看chan的文章:https://jiayq.blog.csdn.net/article/details/135885482

1. select 的特性

1.1 chan读写

select 只能作用于chan,包括数据读取和写入:

func TestSelect(t *testing.T) {var c chan stringc = make(chan string)var msg stringh := "hello"select {case msg = <-c:fmt.Printf("msg = %s\n", msg)case c <- h:fmt.Printf("say hello\n")}
}

在上面的代码中,select有两个case语句,分别对应chan的读取和写入操作。
因为创建的chan没有缓存,在写入chan的同时需要有goroutine去读取,相应的,在读取chan的同时必须有goroutine写入。
上面只有一个goroutine,所以是select阻塞的。
在这里插入图片描述

如果对上面的代码做个修改,设置缓存大小为1:

func TestSelect(t *testing.T) {var c chan stringc = make(chan string, 1)var msg stringh := "hello"select {case msg = <-c:fmt.Printf("msg = %s\n", msg)case c <- h:fmt.Printf("say hello\n")}
}

在这里插入图片描述

因为缓存区为1,此时就能写入了,所以select的写入case是不阻塞的,执行chan写入case。
如果先给设置一个初值呢:

func TestSelect(t *testing.T) {var c chan stringc = make(chan string, 1)c <- "hi"var msg stringh := "hello"select {case msg = <-c:fmt.Printf("msg = %s\n", msg)case c <- h:fmt.Printf("say hello\n")}
}

在这里插入图片描述

因为缓存区只有1,已经满了,所以chan无法再写入了,只有chan读取是不阻塞的,所以每次都会执行chan读取。
如果chan既能写,又能读呢:

func TestSelect(t *testing.T) {var c chan stringc = make(chan string, 2)c <- "hi"var msg stringh := "hello"select {case msg = <-c:fmt.Printf("msg = %s\n", msg)case c <- h:fmt.Printf("say hello\n")}
}

当两个case都满足的时候,多次执行,发现select选择的case是不确定的:
在这里插入图片描述

在这里插入图片描述

可能要多执行几次才能发现不一样。
如果没有case满足,而且又不希望阻塞呢:

func TestSelect(t *testing.T) {var c chan stringc = make(chan string)var msg stringh := "hello"select {case msg = <-c:fmt.Printf("msg = %s\n", msg)case c <- h:fmt.Printf("say hello\n")default:fmt.Printf("default case \n")}
}

在这里插入图片描述

此时就不会阻塞了。
如果既有default,又有多个不阻塞的case呢:

func TestSelect(t *testing.T) {var c chan stringc = make(chan string, 2)c <- "hi"var msg stringh := "hello"select {case msg = <-c:fmt.Printf("msg = %s\n", msg)case c <- h:fmt.Printf("say hello\n")default:fmt.Printf("default case \n")}
}

此时只会随机执行case语句,不会执行default语句。
总结下,select的每个case语句只能操作一个chan,要么写入数据,要么读取数据。
如果没有chan可以进行读写操作,在没有default的情况下,select会阻塞,直到任意一个chan解除阻塞。
如果存在多个case不阻塞,那么select会随机挑选一个执行。
如果case阻塞,有default,那么select会执行default。
如果case不阻塞,有default,那么select永远不会执行default。

1.2 返回值

select 为Go语言的预留关键字,不是函数,可以在case语句中声明变量并为变量赋值。
case语句读取chan时,最多可以给两个变量赋值:

func TestSelect(t *testing.T) {var c chan stringc = make(chan string, 2)c <- "hi"var msg stringselect {case <-c: // 0个变量fmt.Printf("skip res\n")case msg = <-c: // 一个变量fmt.Printf("msg = %s\n", msg)case msg, ok := <-c: // 两个变量,注意这里的 msg 重新定义了,与select外面的没有关系了,只是同名fmt.Printf("msg=%s, ok=%v", msg, ok)}
}

在这里插入图片描述

case 语句中chan的读取有两个返回条件,第一个是成功读取到数据,第二个是没有数据,且管道已经被关闭。

chan 读取更多资料:https://jiayq.blog.csdn.net/article/details/135885482
msg 在case中重新声明了, 简短变量声明更多资料:https://jiayq.blog.csdn.net/article/details/136428399

使用两个变量接收chan读取的返回值,可以判断读取到的值是否可信。
如果在上面的代码中增加 close(c),这样case语句中的chan读取到了零值:

func TestSelect(t *testing.T) {var c chan stringc = make(chan string)var msg stringclose(c)select {case <-c:fmt.Printf("skip res\n")case msg = <-c:fmt.Printf("msg = %s\n", msg)case msg, ok := <-c:fmt.Printf("msg=%s, ok=%v", msg, ok)}
}

此时就拿到了零值,如果不判断是否可信,而是直接使用,就使用了预期之外的数据。
在这里插入图片描述

1.3 default

select 中的default语句不能处理chan的读写操作,当select的所有case语句都阻塞时,default语句将被执行。
在1.1中已经尝试了default的例子了。
需要注意的是,在一个select中,default只能出现一次,不限制位置,可以出现在任意顺序。

2. select 经典使用

2.1 永久阻塞

如果我们启动goroutine处理任务,并且不希望main函数退出,需要永久性阻塞main函数。
在Kubernetes项目的多个组件中均有使用select阻塞main函数的例子:

func main(){server := webhooktesting.NewTestServer(nil)server.StartTLS()fmt.Println("servering on", server.URL)select {}
}

如果一个select不包含case语句和default语句,那么goroutine就会陷入永久性阻塞中。

2.2 快速检错

如果使用chan在不同的goroutine中传递异常错误,使用select语句可以快速检查chan中是否有错误,避免等待:

    errCh := make(chan error, activate)jm.deleteJobPods(&job, activatePods, errCh) // 传入chan用于记录错误select{case manageJobErr = <- errCh: // 检查是否有错误发生if manageJobErr != nil {break}default: // 没有错误,快速结束检查}

如果没有异常错误,那么select会执行default语句,不会因为从chan中读取数据导致阻塞。

2.3 限时等待

如果想实现一个有效期的chan,当超过有效期,那么关闭chan,不能在写入了。

func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {stopChWithTimeout := make(chan struct{})go func() {select {case <- stopCh: // 自然结束case <- time.After(timeout): // chan有效期}close(stopChWithTimeout)} ()return stopChWithTimeout
}

函数返回一个chan,可以在函数之间传递,但是chan会在指定时间后自动关闭。

3. 实现原理

请先思考:

  • 为什么每个case语句只能处理一个chan?
  • 为什么case语句的执行顺序是随机的(多个case都就绪)?
  • 为什么在case语句中向值为nil的chan中写数据不会触发panic?

3.1 数据结构

select中的case语句对应于runtime包中的scase(select-case)数据结构reflect/value.go:2480

// A runtimeSelect is a single case passed to rselect.
// This must match ../runtime/select.go:/runtimeSelect
type runtimeSelect struct {dir SelectDir      // SelectSend, SelectRecv or SelectDefaulttyp *rtype         // channel typech  unsafe.Pointer // channelval unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
}

可以看出每个select-case的结构都如上,也就是说,每个case中必须包含chan,而且只能有一个chan。
另外,编译器在处理case语句时,如果case语句中没有chan,则会给出编译错误:

    select case must be receive, send or assign recv

runtime/select.go中也有定义:

// A runtimeSelect is a single case passed to rselect.
// This must match ../reflect/value.go:/runtimeSelect
type runtimeSelect struct {dir selectDirtyp unsafe.Pointer // channel type (not used here)ch  *hchan         // channelval unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
}

其中的dir是标明了case的类型,总共有:

// These values must match ../reflect/value.go:/SelectDir.
type selectDir intconst (_             selectDir = iotaselectSend              // case Chan <- SendselectRecv              // case <-Chan:selectDefault           // default
)

总共有4中:chan 的值为nil, 写chan, 读chan, default。
当chan的值为nil,那么不管是读还是写,都会永久阻塞,也就是说,如果case中的chan的值为nil,这类case会永远阻塞,select永远不会选中执行。
这也是为什么在case语句中向值为nil的chan中写数据不会panic。
这也是为什么会忽略第一个值。
default为特殊类型的case语句,其不会操作chan。而且,每个select语句中只能存在一个default语句,并且default语句可以出现在任意位置。
reflect/value.go中,存在对外可见的SelectCase声明:

type SelectCase struct {Dir  SelectDir // direction of caseChan Value     // channel to use (for send or receive)Send Value     // value to send (for send)
}

这个相比较少了很多运行时的定义。

3.2 实现逻辑

reflect/value.go中,有对select的执行逻辑的定义:

// Select executes a select operation described by the list of cases.
// Like the Go select statement, it blocks until at least one of the cases
// can proceed, makes a uniform pseudo-random choice,
// and then executes that case. It returns the index of the chosen case
// and, if that case was a receive operation, the value received and a
// boolean indicating whether the value corresponds to a send on the channel
// (as opposed to a zero value received because the channel is closed).
// Select supports a maximum of 65536 cases.
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool) {// select 的case 最多有65535 个// int = int32 = 2^16 - 1 ,最高位是符号位// 如果 case 的数量超过 65535,panicif len(cases) > 65536 {panic("reflect.Select: too many cases (max 65536)")}// 使用内部运行时的类型拷贝一份casevar runcases []runtimeSelect// 如果 select中的case大于4个if len(cases) > 4 {// 根据长度申请 slice 的长度,避免扩容runcases = make([]runtimeSelect, len(cases))} else {// 如果少于4个,那么申请 slice 的时候,长度按实际长度申请,容量为4runcases = make([]runtimeSelect, len(cases), 4)}// 初始默认没有 defaulthaveDefault := false// 遍历 select的case数组for i, c := range cases {// 访问遍历的 caserc := &runcases[i]// 重新读取,可能是防止幻读?rc.dir = c.Dir// 看看当前遍历的是哪类caseswitch c.Dir {// 不在定义中的case类型,panicdefault:panic("reflect.Select: invalid Dir")// default 类型的 case	case SelectDefault: // default// 如果已经有了default,那么会panic,表示一个select中有两个defaultif haveDefault {panic("reflect.Select: multiple default cases")}// 设置有defaulthaveDefault = true// 如果default中有chan,那么panic,default中不能有chanif c.Chan.IsValid() {panic("reflect.Select: default case has Chan value")}// 如果chan有数据,表示写chan,default中不能写chanif c.Send.IsValid() {panic("reflect.Select: default case has Send value")}// 写chan类型的case	case SelectSend:ch := c.Chan// chan如果为nil,忽略if !ch.IsValid() {break}// 必须是chan类型ch.mustBe(Chan)// 必须可见ch.mustBeExported()// 获取dir值tt := (*chanType)(unsafe.Pointer(ch.typ))// 再次判断chan必须是写chanif ChanDir(tt.dir)&SendDir == 0 {panic("reflect.Select: SendDir case using recv-only channel")}// 设置内部类型属性rc.ch = ch.pointer()rc.typ = &tt.rtype// 获取写chan的数据v := c.Send// 数据是否有效,如果写chan,但是没有数据写,也会panicif !v.IsValid() {panic("reflect.Select: SendDir case missing Send value")}v.mustBeExported()// 设置内部类型属性v = v.assignTo("reflect.Select", tt.elem, nil)if v.flag&flagIndir != 0 {rc.val = v.ptr} else {rc.val = unsafe.Pointer(&v.ptr)}// 读chan类型的case	case SelectRecv:// chan是否有效if c.Send.IsValid() {panic("reflect.Select: RecvDir case has Send value")}ch := c.Chan// chan如果为nil,忽略if !ch.IsValid() {break}ch.mustBe(Chan)ch.mustBeExported()tt := (*chanType)(unsafe.Pointer(ch.typ))if ChanDir(tt.dir)&RecvDir == 0 {panic("reflect.Select: RecvDir case using send-only channel")}rc.ch = ch.pointer()rc.typ = &tt.rtyperc.val = unsafe_New(tt.elem)}}// 将select-case数组转换为runtimeSelectCase类型,调用rselect,得到select命中的case和是否可信chosen, recvOK = rselect(runcases)// 如果是读chan,那么需要返回值if runcases[chosen].dir == SelectRecv {tt := (*chanType)(unsafe.Pointer(runcases[chosen].typ))t := tt.elemp := runcases[chosen].valfl := flag(t.Kind())if ifaceIndir(t) {recv = Value{t, p, fl | flagIndir}} else {recv = Value{t, *(*unsafe.Pointer)(p), fl}}}// 返回case的赋值return chosen, recv, recvOK
}

在这里插入图片描述

reflect/value.go中,select主要是将select-case数组进行规则校验,如果规则校验通过,那么转换为runtimeSelectCase数组,并调用select命中逻辑。
该函数返回select命中的是哪个case,值是什么,是否ok。
Go在运行时,runtime/select.go中的reflect_rselect()函数用于处理select语句
在这里插入图片描述

这里有个知识点://go:linkname
可以看到在reflect/Value.go中的Select函数调用了rselect函数,但是rselect函数没有方法体:
在这里插入图片描述

rselect函数的方法体在runtime/select.go中:
在这里插入图片描述

可以了解下go:linkname

//go:linkname reflect_rselect reflect.rselect
func reflect_rselect(cases []runtimeSelect) (int, bool) {// 如果select没有case,那么将陷入永久性阻塞if len(cases) == 0 {// 永久性阻塞block()}// 创建一个 slice ,长度是select-case数组的长度(包含default)// 用于分离读chan和写chan,将写chan放在前面,读chan放在后面sel := make([]scase, len(cases))// 创建一个 int 类型的 slice,存储 select-case数组的下标,长度和select-case数组的长度相同// 用于记录原始顺序orig := make([]int, len(cases))// 创建两个变量,统计select-case数组中读chan和写chan的数量nsends, nrecvs := 0, 0// 存储 default-case的位置dflt := -1// 对select-case数组进行遍历for i, rc := range cases {var j intswitch rc.dir {// 如果是default-case,那么记录default下标case selectDefault:dflt = icontinue// 如果是写chan,那么写chan的数量加1,同时记录当前下标	case selectSend:j = nsendsnsends++// 如果是读chan,那么读chan的数量加1,同时记录翻转的下标// 这里是为了将读chan和写chan进行分类,写chan移动到slice的前面,读chan移动到slice的后面	case selectRecv:nrecvs++j = len(cases) - nrecvs}// 将传入的select-case做类型转换后,写入新slicesel[j] = scase{c: rc.ch, elem: rc.val}// 记录原始位置orig[j] = i}// 如果 select-case数组中只有default,那么直接选中并返回if nsends+nrecvs == 0 {return dflt, false}// 如果slice中间存在空缺,空缺是default,上面的for-range中,没有将default-case放入sliceif nsends+nrecvs < len(cases) {// 做合并,将空缺移动到最后,也就是将default-case放到最后copy(sel[nsends:], sel[len(cases)-nrecvs:])copy(orig[nsends:], orig[len(cases)-nrecvs:])}// 创建一个2倍读写chan的case长度的slice,不计算default-caseorder := make([]uint16, 2*(nsends+nrecvs))var pc0 *uintptr// 是否编译时有 -race if raceenabled {pcs := make([]uintptr, nsends+nrecvs)for i := range pcs {selectsetpc(&pcs[i])}pc0 = &pcs[0]}// 调用 selectgo 函数选择 casechosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)// 如果没有选中读写chan,那么select选中defaultif chosen < 0 {chosen = dflt} else {// select命中的下标(原顺序的下标)chosen = orig[chosen]}// 返回 select命中的下标和可信标记return chosen, recvOK
}

可以发现runtime/select.go中的reflect_rselect函数提取出来读写chan的case,
并按照先写后读的顺序进行转换为内部类型的slice,最后调用selectgo函数进行select选择读写chan,
如果读写chan都没有选中,那么select选中default,如果没有default,那么会阻塞等待。
接下来看下runtime/select.go中的selectgo函数:

// cas0是select-chan-case的slice,order0是原顺序的slice,pc0是 -race 的slice
// nsends是写chan的case数量, nrecvs是读chan的case数量
// block是是否阻塞的标志,如果有default,那么没有选中返回-1,不阻塞
// 如果没有default,那么没有选中阻塞,直到有case就绪
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {// 是否是debug模式,debug模式会打印一些关键信息if debugSelect {print("select: cas0=", cas0, "\n")}// 将cas0指向一个最大长度为65535的数组cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))// 将cas0指向一个最大长度为65535*2的数组order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))// 计算select-chan-case的数量ncases := nsends + nrecvs// 从 cas1中切取select-chan-case切片// 这里填个坑:https://jiayq.blog.csdn.net/article/details/135716705// 切片扩展表达式3.3scases := cas1[:ncases:ncases]// 从 order1 中获取chan-case长度的切片,假设chan-case长度为n,这里从[0,675535*2)拿到了 [0,n) 长度的切片// 使用切片扩展表达式,设置容量为npollorder := order1[:ncases:ncases]// 从 order1 中获取 [n,2n) 长度的切片// 使用切片扩展表达式,设置容量为nlockorder := order1[ncases:][:ncases:ncases]// 获取cpu的纳秒时间戳var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 统计有效select-chan-case的数量norder := 0// 对传入的select-chan-case切片做遍历for i := range scases {cas := &scases[i]// 如果 chan 为空,重新赋值,或者压根没有赋值,值为nil的chan// 那么清空chan的数据变量,忽略这个chan-caseif cas.c == nil {cas.elem = nil // allow GCcontinue}// 获取一个[0, norder]的随机数j := fastrandn(uint32(norder + 1))// 将随机位置的数字放到递增的norder位置// 当前位置是零值,未做初始化,将当前位置设置为之前任意位置的值pollorder[norder] = pollorder[j]// 将被选定的随机位置放下标位置// 将被交换的位置设置为当前下标// 实际上可以这么立即,首先将当前下标设置为当前位置,然后选择之前的随机一个位置进行交换pollorder[j] = uint16(i)// select-chan-case加1norder++} // 到了这里就对 pollorder 进行了随机shuffle,并且统计出来有效的chan-case数量// 只要有效的chan-case数量的切片pollorder = pollorder[:norder]// 顺便对 lockorder也切一下lockorder = lockorder[:norder]// 下面两个 for 构成一个【堆排序】// 建立大顶堆,大顶堆是为了升序// 遍历 lockorder // 大顶堆,根节点最大// 这个遍历主要是实现 lockorder 的随机初始化,以及 lockorder 切片对应的chan-case的chan的地址是相等或递减的for i := range lockorder {// 临时获取 lockorder 的下标j := i// 获取 shuffle 切片 pollorder中第一个值// 然后获取该值所指示的 chan-case 的chanc := scases[pollorder[i]].c// 根节点小于左节点for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {// 将 k 设置为 lockorder对于当前位置 一半的位置,堆排序,用数组存储一棵树,怎么存储。k := (j - 1) / 2// 交换 当前位置和 当前位置一半的位置lockorder[j] = lockorder[k]// 继续循环,实现 在 lockorder 中对应的 chan-case的chan地址是相等或者递减顺序j = k}// 将 shuffle 切片 pollorder中第一个值设置到 lockorder中的第一个值lockorder[j] = pollorder[i]}// 堆排序for i := len(lockorder) - 1; i >= 0; i-- {// 当前索引的 lockorder 的值o := lockorder[i]// 获取 lockorder 对应的chan-case 的 chanc := scases[o].c// 将大顶堆的根节点换到数组末尾lockorder[i] = lockorder[0]// 重新构建大顶堆j := 0for {// 左节点k := j*2 + 1// 如果到了已经排序过的位置,不在构建堆(数组最后是最大的,如果到了i,表示到了已经排过序的数据,不需要在参与构建大顶堆了)if k >= i {break}// 左节点小于右节点if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {k++}// 根节点小于左节点,交换if c.sortkey() < scases[lockorder[k]].c.sortkey() {lockorder[j] = lockorder[k]j = kcontinue}break}lockorder[j] = o}// 是否是debug模式,如果是,输出堆排序后升序chan-case中chan地址的lockorder切片if debugSelect {for i := 0; i+1 < len(lockorder); i++ {if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")throw("select: broken sort")}}}// 按照 lockorder 的顺序,给chan-case加锁(地址升序排列了,内存访问更快)sellock(scases, lockorder)var (gp     *gsg     *sudogc      *hchank      *scasesglist *sudogsgnext *sudogqp     unsafe.Pointernextp  **sudog)// pass 1 - look for something already waitingvar casi intvar cas *scasevar caseSuccess boolvar caseReleaseTime int64 = -1var recvOK bool// 对shuffle的切片进行遍历for _, casei := range pollorder {// chan-case 的下标casi = int(casei)// chan-casecas = &scases[casi]// chan-case 的 chanc = cas.c// 如果 下标大于 写chan-case的数量,那么表示当前的chan-case是读chan-case// 需要注意,如果写chan关闭,就不能在写了,读chan关闭,如果有缓存区,那么还是能够读的if casi >= nsends {// 获取读chan的goroutine等待队列sg = c.sendq.dequeue()// 如果有读chan的等待队列,那么跳转到读取// 如果有读chan的等待队列,那么对于读chan表示缓存区为空if sg != nil {goto recv}// 如果读chan是带有缓存的,而且缓存区有待读取的数据,那么跳转到数据读取if c.qcount > 0 {goto bufrecv}// 如果读chan已经关闭,那么挑战到读chan关闭if c.closed != 0 {goto rclose}} else { // 不是读chan-case,那么就是写chan-case// 是否 -race if raceenabled {racereadpc(c.raceaddr(), casePC(casi), chansendpc)}// 如果写chan-case的chan已经关闭了,那么跳转到写chan关闭if c.closed != 0 {goto sclose}// 获取写chan的等待队列sg = c.recvq.dequeue()// 如果写chan的等待队列不为空,那么跳转到写入// 如果存在等待队列,那么对于写chan表示缓存区已经满了if sg != nil {goto send}// 如果写chan缓存区未满,那么跳转到写入数据if c.qcount < c.dataqsiz {goto bufsend}}}// 如果有default-case ,那么block=false,如果没有default-case,那么block=trueif !block {// chan-case解锁selunlock(scases, lockorder)// 返回-1,表示select没有选中 chan-case,需要返回default-casecasi = -1// 跳转到返回goto retc}// pass 2 - enqueue on all chans// 能走到这里,表示chan-case没有就绪,而且没有default-case,需要阻塞等待chan-case就绪// 获取当前 goroutine 信息gp = getg()// 如果已经在等待了,异常if gp.waiting != nil {throw("gp.waiting != nil")}// 获取等待信息nextp = &gp.waiting// 对lockorder进行遍历for _, casei := range lockorder {// 获取chan-case的chan的升序地址的位置casi = int(casei)// 获取chan-casecas = &scases[casi]// 获取 cc = cas.c// 获取等待的goroutinesg := acquireSudog()sg.g = gp// 等待的 goroutine 是否在select 阻塞中sg.isSelect = true// 将当前chan-case的数据给等待的select的goroutinesg.elem = cas.elemsg.releasetime = 0if t0 != 0 {sg.releasetime = -1}// 将chan-case的chan赋值给goroutinesg.c = c// 继续下一个等待的goroutine*nextp = sgnextp = &sg.waitlink// 如果当前索引小于写chan-case的数量,那么说明当前是写chanif casi < nsends {// 将等待的goroutine加入chan的等待队列c.sendq.enqueue(sg)} else {// 否则就是读chan了c.recvq.enqueue(sg)}}// 等待goutine 被从chan的等待队列唤醒(其他的goroutine操作chan,会唤醒等待队列)// 释放 goroutine占用的P资源gp.param = nil// 使用原子操作设置goroutine处于等待chan唤醒状态atomic.Store8(&gp.parkingOnChan, 1)// 设置goroutine为等待状态,selparkcommit是goroutine唤醒的逻辑gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)// 设置goroutine不是激活状态gp.activeStackChans = false// 再次加锁sellock(scases, lockorder)// 设置goroutine未完成gp.selectDone = 0// 获取goroutinesg = (*sudog)(gp.param)// 释放goroutine的P资源gp.param = nil// pass 3 - dequeue from unsuccessful chans// otherwise they stack up on quiet channels// record the successful case, if any.// We singly-linked up the SudoGs in lock order.// 如果没有选中chan-case,那么返回 -1,必须有default-casecasi = -1cas = nil// case 执行是否成功caseSuccess = false// 获取goroutine等待列表sglist = gp.waiting// 链表遍历等待的goroutinefor sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {// 退出 select 等待状态sg1.isSelect = false// 数据清空sg1.elem = nil// chan释放sg1.c = nil}// 等待状态退出gp.waiting = nil// 遍历chan-case切片for _, casei := range lockorder {// 获取 chan-casek = &scases[casei]// 如果是goroutine的等待链表头if sg == sglist {// 获取chan-case的切片的下标casi = int(casei)// 设置chan-casecas = k// 获取是否成功的状态caseSuccess = sglist.success// 如果释放时间大于0,那么获取释放时间if sglist.releasetime > 0 {caseReleaseTime = sglist.releasetime}} else {// 获取chan-case的chanc = k.c// 如果当前索引小于写chan的数量,那么当前chan就是写chanif int(casei) < nsends {// 从写chan的等待队列中移除goroutinec.sendq.dequeueSudoG(sglist)} else {// 从读chan的等待队列中移除goroutinec.recvq.dequeueSudoG(sglist)}}// 移动下一个等待的goroutinesgnext = sglist.waitlink// 从链表移除sglist.waitlink = nil// 释放goroutine资源releaseSudog(sglist)// 移动sglist = sgnext}// 如果没有就绪,但是却被唤醒了if cas == nil {throw("selectgo: bad wakeup")}// 拿到chanc = cas.c// 打印调试日志if debugSelect {print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")}// 判断是读chan-case还是写chan-caseif casi < nsends { // 写chan// 如果写chan-case未成功,那么跳转到写chan关闭if !caseSuccess {goto sclose}} else {// 如果写chan成功了,那么设置返回可信标志recvOK = caseSuccess}// -raceif raceenabled {if casi < nsends {raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)} else if cas.elem != nil {raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)}}// -msanif msanenabled {if casi < nsends {msanread(cas.elem, c.elemtype.size)} else if cas.elem != nil {msanwrite(cas.elem, c.elemtype.size)}}// chan-case解锁selunlock(scases, lockorder)// 跳转到返回goto retc
// 读chan读取数据
bufrecv:// -raceif raceenabled {if cas.elem != nil {raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)}racenotify(c, c.recvx, nil)}// -msanif msanenabled && cas.elem != nil {msanwrite(cas.elem, c.elemtype.size)}// 设置可信标志recvOK = true// 从chan的缓存区读取数据qp = chanbuf(c, c.recvx)// 如果读chan-case的数据区不为空,那么清空并交换if cas.elem != nil {typedmemmove(c.elemtype, cas.elem, qp)}// 内存整理typedmemclr(c.elemtype, qp)// 读取索引(缓存区是环形列表)c.recvx++// 如果读取指针追上写入指针,表示缓冲区空(数组构成的环形队列,到了尾端了)if c.recvx == c.dataqsiz {// 重置指针c.recvx = 0}// 缓存区可读数据减1c.qcount--// 解锁chan-caseselunlock(scases, lockorder)// 跳转返回goto retc
// 写chan发送数据
bufsend:if raceenabled {racenotify(c, c.sendx, nil)raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)}if msanenabled {msanread(cas.elem, c.elemtype.size)}// 从 写chan-case中将数据写入chan的缓冲区typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)// chan写入指针以东南c.sendx++// 如果写入指针追上读取指针,表示缓冲区满(数组构成的环形队列,到了尾端了)if c.sendx == c.dataqsiz {// 重置指针c.sendx = 0}// 缓冲区数据加1c.qcount++// 解锁chan-caseselunlock(scases, lockorder)// 跳转返回goto retc
// 读chan读取
recv:// 阻塞读取recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)// 打印调试信息if debugSelect {print("syncrecv: cas0=", cas0, " c=", c, "\n")}// 设置可信标志recvOK = true// 跳转返回goto retc
// 读关闭的chan,有缓冲区的chan,关闭后可读
rclose:// 解锁chan-caseselunlock(scases, lockorder)// 设置可信标志(为什么是false,当数据未读取完的时候,应该是true)recvOK = falseif cas.elem != nil {typedmemclr(c.elemtype, cas.elem)}if raceenabled {raceacquire(c.raceaddr())}// 跳转返回goto retc
// 写chan发送
send:if raceenabled {raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)}if msanenabled {msanread(cas.elem, c.elemtype.size)}// 阻塞发送send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)// 打印调试信息if debugSelect {print("syncsend: cas0=", cas0, " c=", c, "\n")}// 跳转返回goto retc
// 返回
retc:// 如果case释放时间大于0,那么发送阻塞事件if caseReleaseTime > 0 {blockevent(caseReleaseTime-t0, 1)}// 返回选中的chan-case和可信标志return casi, recvOK
// 写关闭的chan,panic
sclose:// 解锁chan-caseselunlock(scases, lockorder)// panic,关闭的chan不允许写panic(plainError("send on closed channel"))
}

3.3 原理总结

首先在reflect/Value.go中,存在对select的定义,在reflect/Value.go中,调用Select函数将select-case转换为内部类型,
并且会进行select-case的规则校验,调用rselect函数进行select选中。
rselect函数通过//go:linknameruntime/select.go中实现,实现函数为reflect_rselect
runtime/select.go中的reflect_rselect函数中对传入的select-case切片做读chan-case和写chan-case的分类,并且剔除永远阻塞和default-case。
接着调用runtime/select.go中的selectgo函数进行chan-case的选中。
selectgo函数中,对传入的select-case首先进行shuffle,得到pollorder。
然后使用大顶堆,对select-case构建lockorder,lockoder中的chan地址升序。
如果读chan-case的chan的等待队列不为空,那么唤醒等待goroutine,进行数据读取。
如果读chan-case的chan的等待队列为空,但是缓存区存在数据,那么进行缓存区数据读取。
如果读chan-case的chan已经关闭,那么直接读缓存区数据,否则返回不可信标志。
如果写chan-case的chan已经关闭,那么panic。
如果写chan-case的chan的等待队列不为空,那么唤醒并写入。
如果写chan-case的chan的缓冲区不满,那么写入数据。
如果有default,那么没有选中直接返回。上层调用直接返回default-case。
如果没有default,那么阻塞读写。
这里填个坑:https://jiayq.blog.csdn.net/article/details/135716705 ,切片扩展表达式3.3。

4. 总结

  • select仅能操作chan.
  • 每个case语句仅能处理一个chan,要么是读chan,要么是写chan.
  • 多个case语句的执行顺序是随机的。
  • 存在default语句,select将不会阻塞。
  • 使用select读取chan时,应该尽可能检查读取是否成功,确定数据是否可信。

4.1 大概原理

Go在运行时包中提供了selectgo函数用于处理select语句:

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) 

selectgo函数会从一组case语句中挑选一个case,并返回命中case的下标,对于读chan-case,还会返回是否成功地读取了数据(第二个返回值对于其他类型case无意义)。
在这里插入图片描述

4.2 参数

编译器会将select中的case语句存储在一个数组中,selectgo的第一个参数cas0就是这个数组的地址,参数nsends,nrecvs表示读chan-case和写chan-case的个数。
selectgo第二个参数order0是一个整型数组地址,长度是chan-case个数的2倍,order0数组是case执行随机性的关键。
order0数组被一分为二,前半部分存放chan-case的随机顺序(源代码中称为pollorder),selectgo函数会将原始的chan-case顺序打乱,这样在检查每个chan-case是否就绪时,就会表现出随机性。
后半部分存放chan加锁的顺序(源代码中称之为lockorder),selectgo会按照chan地址升序的顺序对chan进行加锁,从而避免因重复加锁引发的死锁问题。
在这里插入图片描述

4.3 返回值

当所有的chan-case都不可能就绪时,selectgo会陷入永久的阻塞,此时函数不会返回。一旦selectgo返回,就说明某个chan-case语句就绪了。
第一个返回值代表case的编号,这个编号与代码中出现的顺序一致,而非打乱后的顺序。
第二个返回值代表是否从chan中读取了数据,该值只针对读chan-case有意义。特别注意的是,第二个返回值为true时,仅代表从chan中读取了数据,对于已经关闭了的chan也是如此。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/748793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot基于SpringBoot的学生请假管理系统的设计与实现

摘 要 系统根据现有的管理模块进行开发和扩展&#xff0c;采用面向对象的开发的思想和结构化的开发方法对学生请假管理的现状进行系统调查。采用结构化的分析设计&#xff0c;该方法要求结合一定的图表&#xff0c;在模块化的基础上进行系统的开发工作。在设计中采用“自下而上…

江科大stm32学习笔记【6-2】——定时器定时中断定时器外部时钟

一.定时器定时中断 1.原理 2.硬件 3.程序 此时CK_PSC72M&#xff0c;定时1s&#xff0c;也就是定时频率为1Hz&#xff0c;所以可以PSC7200-1,ARR10000-1。 Timer.c: #include "stm32f10x.h" // Device headerextern uint16_t Num;//声明跨文件的…

力扣经典题:分割平衡字符串

大佬的代码非常简洁 int balancedStringSplit(char * s){short i0,count0,sign0;while(s[i]){signs[i]L?sign1:sign-1;if(sign0) count;}return count; }

Transformer代码从零解读【Pytorch官方版本】

文章目录 1、Transformer大致有3大应用2、Transformer的整体结构图3、如何处理batch-size句子长度不一致问题4、MultiHeadAttention&#xff08;多头注意力机制&#xff09;5、前馈神经网络6、Encoder中的输入masked7、完整代码补充知识&#xff1a; 1、Transformer大致有3大应…

第十四届蓝桥杯省赛真题 Java A 组【原卷】

文章目录 发现宝藏【考生须知】试题 A \mathrm{A} A : 特殊日期试题 B: 与或异或试题 C : \mathrm{C}: C: 平均试题 D: 棋盘试题 E : \mathrm{E}: E: 互质数的个数试题 F: 阶乘的和试题 G: 小蓝的旅行计划试题 H: 太阳试题 I: 高塔试题 J \mathrm{J} J : 反异或 01 串 发现…

ChatGPT编程—实现小工具软件(批量替换文本、批量处理图像文件)

ChatGPT编程—实现小工具软件(批量替换文本、批量处理图像文件) 今天借助[小蜜蜂AI][https://zglg.work]网站的ChatGPT编程实现一个功能&#xff1a;批量处理文件及其内容&#xff0c;例如批量替换文本、批量处理图像文件等。 环境&#xff1a;Pycharm 2021 系统&#xff1a…

NVENC 视频编码器 API 编程指南 ( 中文转译 )

基于 NVIDIA Kepler™ 和更高版本 GPU 架构的 NVIDIA GPU 包含基于硬件的 H.264/HEVC/AV1 视频编码器&#xff08;以下简称 NVENC&#xff09;。NVENC 硬件采用 YUV/RGB 作为输入&#xff0c;并生成符合H.264/HEVC/AV1 标准的视频比特流。可以使用 NVIDIA 视频编解码器 SDK 中提…

挑战杯 机器视觉人体跌倒检测系统 - opencv python

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 机器视觉人体跌倒检测系统 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&…

蓝桥:硬币兑换(python)

问题描述&#xff1a; 小蓝手中有2023种不同面值的硬币,这些硬币全部是新版硬币,其中第i(1≤i≤2023)种硬币的面值为i,数量他为i个。硬币兑换机可以进行硬币兑换&#xff0c;兑换规则为:交给硬币兑换机两个新版硬币coin1和coin2,硬币兑换机会兑换成一个面值为coin1十coin2的旧…

使用C语言计算1/1-1/2+1/3-1/4+...+1/99-1/100

观察算式&#xff0c;发现分子都是1&#xff0c;分母从1~100&#xff0c;所以可以使用for循环产生1~100之间的数。 另一个问题是&#xff0c;如何产生正负交替的符号&#xff1f;很简单&#xff0c;这个符号本质上就是往每一项前面乘一个系数&#xff1a;一或者负一。所以只需…

008:安装Docker

安装Docker 如果不太熟悉Linux命令&#xff0c;不想学习Linux命令&#xff0c;可以直接看文末NAS面板章节&#xff0c;通过面板&#xff0c;像使用Window一样操作NAS。 一、安装 Docker 1.安装 Docker wget -qO- https://get.docker.com/ | sh2.启动 Docker 服务 sudo sys…

算法练习:二分查找

目录 1. 朴素二分查找2. 在排序数组中查找元素的第一个和最后一个位置3. 搜索插入位置4. x的平方根5. 山脉数组的峰值索引6. 寻找峰值7. 寻找旋转排序数组中的最小值8. 点名 1. 朴素二分查找 题目信息&#xff1a; 题目链接&#xff1a; 二分查找二分查找的使用前提为数据具有&…

掌握高级设计原则:Java中的过滤器模式解析与实战演练,构建灵活且可扩展的系统架构

过滤器模式是一种结构型设计模式&#xff0c;它允许开发者使用不同的标准来过滤一组对象&#xff0c;并通过逻辑运算以解耦的方式将它们联系起来。 过滤器模式的核心在于提供了一个处理对象的机制&#xff0c;这个机制可以根据一个或多个标准来决定哪些对象应该被接受、哪些应…

解析KafkaConsumer类的神奇之道

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 解析KafkaConsumer类的神奇之道 前言KafkaConsumer双线程设计主线程&#xff08;消费线程&#xff09;&#xff1a;心跳线程&#xff1a;示例代码&#xff1a; KafkaConsumer线程不安全线程安全的替代…

jetson nano——编译一些包的网址导航,pyside2,qt(持续更新)

目录 1.PySide2下载地址2.tesserocr下载地址3.Qt下载地址4.OpenSSL官网5.latex编译器下载地址5.1MikTex5.2TeX Live 1.PySide2下载地址 https://download.qt.io/official_releases/QtForPython/pyside2/ 如下图&#xff1a; 2.tesserocr下载地址 https://github.com/simonflue…

PTA冰岛人

作者 陈越 单位 浙江大学 2018年世界杯&#xff0c;冰岛队因1:1平了强大的阿根廷队而一战成名。好事者发现冰岛人的名字后面似乎都有个“松”&#xff08;son&#xff09;&#xff0c;于是有网友科普如下&#xff1a; 冰岛人沿用的是维京人古老的父系姓制&#xff0c;孩子的姓…

行业突破!四信实现低延时摄像头弱网状态100ms以内实时传输

随着人工智能、大数据、区块链等技术在城市中快速发展&#xff0c;人们日常生活中已经离不开网络的支撑&#xff0c;而实现“人与人”、“人与物”及“物与物”之间高速连接应用的“时延”&#xff0c;是网络支撑中最重要的存在。 以城市生活例子为例&#xff0c;当网络延时出现…

通过日志恢复sql server数据库

在SQL Server中&#xff0c;通过日志恢复数据库是一个精细的过程&#xff0c;主要用于在数据库出现错误、数据丢失或需要回滚到特定时间点时恢复数据。以下是一般步骤概述&#xff1a; 设置恢复模式&#xff1a; 首先&#xff0c;数据库必须配置为“完整恢复模式”或“大容量…

【Miniconda】Linux系统中 .condarc 配置文件的位置一般在哪里

【Miniconda】Linux系统中 .condarc 配置文件的位置一般在哪里 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到…

3. ElasticSearch搜索技术深入与聚合查询实战

1. ES分词器详解 1.1 基本概念 分词器官方称之为文本分析器&#xff0c;顾名思义&#xff0c;是对文本进行分析处理的一种手段&#xff0c;基本处理逻辑为按照预先制定的分词规则&#xff0c;把原始文档分割成若干更小粒度的词项&#xff0c;粒度大小取决于分词器规则。 1.2 …