Go语言channel与select原理

本文会尝试解释 go runtime 中 channel 和 select 的具体实现,部分内容来自 gophercon2017。Go版本为1.8.3

channel

第一部分讲述一下 channel 的用法。channel 可以看做一个队列,用于多个goroutine之间的通信,例如下面的例子,一个goroutine发送msg,另一个msg接受消息。channel 分为带缓冲和不带缓冲,差别不是很大,具体请自行google。看一个简单的例子,了解一下channel的使用。

package mainimport "fmt"func main() {// Create a new channel with `make(chan val-type)`.// Channels are typed by the values they convey.messages := make(chan string)// Send a value into a channel using the `channel <-`// syntax. Here we send `"ping"`  to the `messages`// channel we made above, from a new goroutine.go func() { messages <- "ping" }()// The `<-channel` syntax receives a value from the// channel. Here we'll receive the `"ping"` message// we sent above and print it out.msg := <-messagesfmt.Println(msg)
}

channel的功能点:

  1. 队列
  2. 阻塞
  3. 当一端阻塞,可以被另一个端唤醒

我们围绕这3点功能展开,讲讲具体的实现。

channel结构

注释标注了几个重要的变量,从功能上大致可以分为两个功能单元,一个是 ring buffer,用于存数据; 一个是存放 goroutine 的队列。

type hchan struct {qcount   uint           // 当前队列中的元素个数dataqsiz uint           // 缓冲队列的固定大小buf      unsafe.Pointer // 缓冲数组elemsize uint16closed   uint32elemtype *_type // element typesendx    uint   // 下一次发送的 indexrecvx    uint   // 下一次接收的 indexrecvq    waitq  // 接受者队列sendq    waitq  // 发送者队列// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex
}

Ring Buffer

主要是以下变量组成的功能, 一个 buf 存储实际数据,两个指针分别代表发送,接收的索引位置,配合 size, count 在数组大小范围内来回滑动。

qcount   uint           // 当前队列中的元素个数
dataqsiz uint           // 缓冲队列的固定大小
buf      unsafe.Pointer // 缓冲数组
sendx    uint   // 下一次发送的 index
recvx    uint   // 下一次接收的 index

举个例子,假设我们初始化了一个带缓冲的channel, ch := make(chan int, 3), 那么它初始状态的值为:

qcount   = 0
dataqsiz = 3
buf      = [3]int{0, 0, 0} // 表示长度为3的数组
sendx    = 0
recvx    = 0

第一步,向 channel 里 send 一个值, ch <- 1, 因为现在缓冲还没满,所以操作后状态如下:

qcount   = 1
dataqsiz = 3
buf      = [3]int{1, 0, 0} // 表示长度为3的数组
sendx    = 1
recvx    = 0

快进两部,连续向 channel 里 send 两个值 (2, 3),状态如下:

qcount   = 3
dataqsiz = 3
buf      = [3]int{1, 2, 3} // 表示长度为3的数组
sendx    = 0 // 下一个发送的 index 回到了0
recvx    = 0

从 channel 中 receive 一个值, <- ch, 状态如下:

qcount   = 2
dataqsiz = 3
buf      = [3]int{1, 2, 3} // 表示长度为3的数组
sendx    = 0 // 下一个发送的 index 回到了0
recvx    = 1 // 下一个接收的 index

阻塞

我们看下,如果 receive channel 时,channel 的 buffer中没有数据是怎么处理的。逻辑在 chanrecv 这个方法中,它的大致流程如下,仅保留了阻塞操作的代码。

func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 检查 channdel 是否为 nil// 当不阻塞时,检查buffer大小,当前大小,检查chennel是否关闭,看看是否能直接返回// 检查发送端是否有等待的goroutine,下部分会提到// 当前buffer中有数据,则尝试取出。// 如果非阻塞,直接返回// 没有sender等待,buffer中没有数据,则阻塞等待。gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.selectdone = nilmysg.c = cgp.param = nilc.recvq.enqueue(mysg)//关键操作:设置 goroutine 状态为 waiting, 把 G 和 M 分离goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)// someone woke us up// 被唤醒,清理 sudogif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}

这里的操作就是 创建一个 当前 goroutine 的 sudog, 然后把这个 sudog 放入 channel 的接受者等待队列;设置当前 G 的状态,和 M分离,到这里当前G就阻塞了,代码不会执行下去。
当被唤醒后,执行sudog的清理操作。这里接受buffer中的值的指针是 ep 这个变量,被唤醒后好像没有向 ep 中赋值的操作。这个我们下部分会讲。

sudog

还剩最后一个疑问,当一个goroutine因为channel阻塞,另一个goroutine是如何唤醒它的。

channel 中有两个 waitq 类型的变量, 看下结构发现,就是sudog的链表,关键是 sudog。sudog中包含了goroutine的引用,注意一下 elem这个变量,注释说可能会指向stack。

type waitq struct {first *sudoglast  *sudog
}type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on// this.g          *gselectdone *uint32 // CAS to 1 to win select race (may point to stack)next       *sudogprev       *sudogelem       unsafe.Pointer // data element (may point to stack)// The following fields are never accessed concurrently.// waitlink is only accessed by g.acquiretime int64releasetime int64ticket      uint32waitlink    *sudog // g.waiting listc           *hchan // channel
}

讲阻塞部分的时候,我们看到goroutine被调度之前,有一个 enqueue操作,这时,当前G的sudog已经被存入recvq中,我们看下发送者这时的操作。

这里的操作是,sender发送的值 直接被拷贝到 sudog.elem 了。然后唤醒 sudog.g ,这样对面的receiver goroutine 就被唤醒了。具体请下面的注释。

func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 检查工作// 如果能从 chennel 的 recvq 弹出 sudog, 那么直接sendif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) })return true}// buffer有空余空间,返回; 阻塞操作
}func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {// 处理 index// 关键if sg.elem != nil {// 这里是根据 elemtype.size 复制内存sendDirect(c.elemtype, sg, ep)sg.elem = nil}// 一些处理// 重新设置 goroutine 的状态,唤醒它goready(gp, 4)
}func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {// src is on our stack, dst is a slot on another stack.// Once we read sg.elem out of sg, it will no longer// be updated if the destination's stack gets copied (shrunk).// So make sure that no preemption points can happen between read & use.dst := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)memmove(dst, src, t.size)
}// memmove copies n bytes from "from" to "to".
// in memmove_*.s
//go:noescape
func memmove(to, from unsafe.Pointer, n uintptr)

select

在看 chanrecv()方法 时,发现了一个 block 参数,代表操作是否阻塞。一般情况下,channel 都是阻塞的(不考虑buffer),那什么时候非阻塞呢?

第一个想到的就是 select, 在写了default case的时候,其他的channel是非阻塞的。

还有一个可能不常用,就是 channel 的反射 value, 可以是非阻塞的,这个方法是public的,我们先看下简单的。

func (v Value) TryRecv() (x Value, ok bool)
func (v Value) TrySend(x Value) bool

select 就复杂一点点,首先在源码中发现一段注释:

// compiler implements
//
//    select {
//    case c <- v:
//        ... foo
//    default:
//        ... bar
//    }
//
// as
//
//    if selectnbsend(c, v) {
//        ... foo
//    } else {
//        ... bar
//    }
//
func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
}// compiler implements
//
//    select {
//    case v = <-c:
//        ... foo
//    default:
//        ... bar
//    }
//
// as
//
//    if selectnbrecv(&v, c) {
//        ... foo
//    } else {
//        ... bar
//    }
//
func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {selected, _ = chanrecv(t, c, elem, false)return
}

如果是一个 case + default 的模式,那么编译器就调用以上方法来实现。

如果是多个 case + default 的模式呢?select 在runtime到底是如何执行的?写个简单的select编译一下。

package mainfunc main() {var ch chan intselect {case <-ch:case ch <- 1:default:}
}

go tool compile -S -l -N test.go > test.s 结果中找一下关键字,例如:

0x008c 00140 (test.go:5)    CALL    runtime.newselect(SB)
0x00ad 00173 (test.go:6)    CALL    runtime.selectrecv(SB)
0x00ec 00236 (test.go:7)    CALL    runtime.selectsend(SB)
0x0107 00263 (test.go:8)    CALL    runtime.selectdefault(SB)
0x0122 00290 (test.go:5)    CALL    runtime.selectgo(SB)

这里 selectgo 是实际运行的方法,找一下,注意注释。先检查channel是否能操作,如果不能操作,就走 default 逻辑。

loop:// pass 1 - look for something already waitingvar dfl *scasevar cas *scasefor i := 0; i < int(sel.ncase); i++ {cas = &scases[pollorder[i]]c = cas.cswitch cas.kind {// 接受数据case caseRecv:sg = c.sendq.dequeue()// 如果有 sender 在等待if sg != nil {goto recv}// 当前buffer中有数据if c.qcount > 0 {goto bufrecv}// 关闭的channelif c.closed != 0 {goto rclose}case caseSend:if raceenabled {racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)}// 关闭if c.closed != 0 {goto sclose}// 有 receiver 正在等待sg = c.recvq.dequeue()if sg != nil {goto send}// 有空间接受if c.qcount < c.dataqsiz {goto bufsend}// 走defaultcase caseDefault:dfl = cas}}if dfl != nil {selunlock(scases, lockorder)cas = dflgoto retc}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/455285.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

成本预算的四个步骤_全网推广步骤有哪些?

全网推广的步骤是什么&#xff1f;一般来说&#xff0c;搜索引擎优化是大多数中小企业常用的推广方法。主要是通过对一些搜索引擎的排名来提高网站的曝光率&#xff0c;从而更好的提高自己网站的流量&#xff0c;从而更好的实现互联网层面的销售。接下来&#xff0c;让我们学习…

python生成requirements.txt的两种方法

python项目如何在另一个环境上重新构建项目所需要的运行环境依赖包&#xff1f; 使用的时候边记载是个很麻烦的事情&#xff0c;总会出现遗漏的包的问题&#xff0c;这个时候手动安装也很麻烦&#xff0c;不能确定代码报错的需要安装的包是什么版本。这些问题&#xff0c;requi…

node.js 安装使用http-server

node.js npm全局安装了http-server后我该怎么使用它&#xff1f;我在它的安装目录下创建了inde.html&#xff0c;浏览器localhost:8080可以访问&#xff0c;那我的项目需要放在它的安装目录下&#xff1f;还是需要在我的项目下配置什么或者使用什么指令启动它&#xff1f;我在我…

您的apple id 暂时不符合使用此应用程序_Mac相机不工作时该怎么办

苹果公司的许多台式机和笔记本电脑都包含一个内置网络摄像头&#xff0c;该公司愉快地将其称为FaceTime相机。但是&#xff0c;如果您的Mac网络摄像头无法正常工作&#xff0c;并且在尝试访问它时显示为断开连接或不可用&#xff0c;则您可能不会感到高兴。您可以尝试以下操作来…

基于DirectShow的流媒体解码和回放

一、 前言  流媒体的定义很广泛&#xff0c;大多数时候指的是把连续的影像和声音信息经过压缩处理后放上网站服务器&#xff0c;让用户一边下载一边观看、收听&#xff0c;而不需要等整个压缩文件下载到自己机器就可以观看的视频/音频传输、压缩技术。流媒体也指代由这种技术…

汕头市队赛 SRM16 T2

描述 猫和老鼠&#xff0c;看过吧&#xff1f;猫来了&#xff0c;老鼠要躲进洞里。在一条数轴上&#xff0c;一共有n个洞&#xff0c;位置分别在xi&#xff0c;能容纳vi只老鼠。一共有m只老鼠位置分别在Xi&#xff0c;要躲进洞里&#xff0c;问所有老鼠跑进洞里的距离总和最小是…

C#调用WebService实例和开发(转)

http://www.cnblogs.com/peterpc/p/4628441.html 一、基本概念 Web Service也叫XML Web Service WebService是一种可以接收从Internet或者Intranet上的其它系统中传递过来的请求&#xff0c;轻量级的独立的通讯技术。是:通过SOAP在Web上提供的软件服务&#xff0c;使用WSDL文件…

智能情绪分析技术_简单分析人工智能的表现在计算机网络应用技术中的优势

简单分析人工智能的表现在计算机网络应用技术中的优势大数据时代背景下&#xff0c; 计算机网络技术迅猛发展&#xff0c; 而人工智能技术的发展也进一步推动了计算机网络技术的发展&#xff0c; 两者相互融合&#xff0c; 相互促进&#xff0c; 实现了双赢发展。从人工智能技术…

MV预测过程详解

第一步&#xff1a;确定相邻块 MV 预测以宏块分割&#xff08;或亚宏块分割&#xff0c;如果宏块存在亚分割&#xff09;为单位&#xff0c;同一个宏块分割&#xff08;或亚宏块分割&#xff09;内所有 4*4 块 MV 预测值相同。以每个宏块分割&#xff08;或亚宏块分割&…

Zabbix2.2.6邮件报警设置方法

http://www.jb51.net/article/56973.htm 这篇文章主要介绍了Zabbix邮件报警设置方法,在Zabbix服务端设置邮件报警&#xff0c;当被监控主机宕机或者达到触发器预设值时&#xff0c;会自动发送报警邮件到指定邮箱说明&#xff1a;Zabbix监控服务端、客户端都已经部署完…

matlab 矩阵拼接

E[a&#xff0c;b]%水平方向上的拼接 E[a &#xff1b;b] %垂直方向上的拼接 转载于:https://www.cnblogs.com/hsy1941/p/7124083.html

Machine Learning——octave矩阵操作(2)——DAY3

矩阵的数学操作&#xff1a; Assumed: a为一个矩阵&#xff0c;m是一个向量 Log(a)——求每一个元素的对数 Exp(a)——以e为底的指数 1./a——求每个元素的导师 [a,b]max(m)——m是一个向量&#xff0c;a为m当中最大的元素&#xff0c;b为a在m中的排列序号&#xff08;已按从小…

字符串中文判断

2019独角兽企业重金招聘Python工程师标准>>> 1、判断字符串是否全是中文或含有中文 <?php header(Content-type:text/html; charsetutf-8); $str 你好; if(preg_match(/^[\x{4e00}-\x{9fa5}]$/u, $str)>0){ echo 全是中文; …

分治2--取余运算

分治2--取余运算 一、心得 二、题目和分析 题目描述 输入b&#xff0c;p&#xff0c;k的值&#xff0c;求bp mod k的值。其中b&#xff0c;p&#xff0c;k*k为长整型数。输入 三个整数&#xff0c;分别为b&#xff0c;p&#xff0c;k的值输出 bp mod k样例输入 2 10 9样例输出 …

-mysql-锁机制分为表级锁和行级锁

2019独角兽企业重金招聘Python工程师标准>>> 声明&#xff1a;本栏目所使用的素材都是凯哥学堂VIP学员所写&#xff0c;学员有权匿名&#xff0c;对文章有最终解释权&#xff1b;凯哥学堂旨在促进VIP学员互相学习的基础上公开笔记。 mysql锁机制分为表级锁和行级锁 …

托福试卷真题_干货解答考生疑惑,自考真题考过了还会在出吗?

重视真题&#xff01;重视真题&#xff01;重视真题&#xff01;重要的话要说三遍。想自考的你们一定要注意&#xff0c;对于历年真题&#xff0c;从来都是“备考必做”的态度。做自考真题&#xff0c;除了可以让自己尽快熟悉考试题型和考点外&#xff0c;还有什么好处呢&#…

2016 ACM/ICPC Asia Regional Dalian Online

自己还是太菜&#xff0c;补题离不开题解。。。 但还是留个博客&#xff0c;万一以后忘了。。。 1001 Different Circle Permutation Polya定理&#xff0c;第一次遇见&#xff0c;学习了一下。不旋转的时候可以得到 f[i]f[i-1]f[i-2] 斐波那契数列&#xff0c;旋转后就可以通过…

天融信安全接入客户端_天融信提示您警惕物联网设备Ripple20漏洞风险

近日&#xff0c;天融信阿尔法实验室在JSOF实验室发布的由Treck公司开发的TCP/IP软件库中获取到一系列0day漏洞。JSOF实验室发布的这批漏洞共计19个&#xff0c;被JSOF研究人员称为"Ripple20"。受此软件库影响的产品数量估计超过数亿&#xff0c;其中包括智能家居设备…

GreenSock (TweenMax) 动画案例(二)

实现效果 动画分解 1.灯光闪烁2.文字出现3.水流4.心电图 知识点 1.AI(可尽情骚扰UI欧巴)2.SVG(了解基本的知识点)3.TweenMax(GreenSock)4.CSS animation 写在前面 写过第一篇文章后GreenSock (TweenMax) 动画案例(一)再回头看发现代码太多&#xff0c;根本没耐心去看完。所以每…

无限轮播图片的实现原理

无限轮播图相信是很多开发人员常用的一个功能&#xff0c;这里总结一下常用的两种方式的实现原理 一、使用UIScrollview实现无限轮播用UIScrollView实现&#xff0c;在scrollView上添加3个UIImageView&#xff0c;分别用来显示上一张图片&#xff0c;当前显示的图片&#xff0c…