Go源码--channel源码解读

简介

channel顾名思义就是channel的意思,主要用来在协程之间传递数据,所以是并发安全的。其实现原理,其实就是一个共享内存再加上锁,底层阻塞机制使用的是GMP模型。可见 GMP模型就是那个道,道生一,一生二,二生三,三生万物。
一个简单的例子 如下:

func main() {c := make(chan int32, 1)go func() {c <- 1}()go func() {fmt.Println(<-c)}()time.Sleep(2 * time.Second)close(c)
}

运行结果是:

1

其汇编部分代码如下:

 CALL    runtime.makechan(SB)  // 对应 make(chan int32,1)...CALL    runtime.chanrecv1(SB) // 对应 <-c ...CALL    runtime.chansend1(SB) // 对应 c<-1 其中 <- 编译后就代表运行时 chanrecv1 函数 ...CALL    runtime.closechan(SB) // 对应 close(c)

CALL runtime.closechan(SB)
协程部分汇编代码,因为不是重点,略过 感兴趣的自己可以编译看看。

chan的所有内容都存放在runtime.hchan这个结构体中,makechan,chanrecv1和chansend1函数都是操作hchan这个结构体来实现chan的功能的。下面我们来看下 hchan结构体。

两种重要结构体

hchan 结构体

hchan结构体 如下

type hchan struct {qcount   uint           // total data in the queue  环形队列里面的总的数据量 dataqsiz uint           // size of the circular queue // 环形队列大小 就是 make chan时 申请的大小buf      unsafe.Pointer // points to an array of dataqsiz elements // 指向环形队列的指针elemsize uint16         // 储存的元素类型占空间大小closed   uint32         // chan 状态 1 关闭 0 未关闭elemtype *_type         // element type // 元素类型   // make chan是 指定的类型 不过这个类型要进行运行时转换 但对应关系是这样的 sendx    uint           // send index // 发送索引recvx    uint           // receive index //  获取索引recvq    waitq          // list of recv waiters // 获取协程等待队列sendq    waitq          // list of send waiters // 发送携程等待队列lock mutex // 锁
}
waitq 结构体

waitq结构体用来存储阻塞在chan上的协程状态sudog结构体(内部包含了 协程信息)
其结构如下:

// 等待队列
type waitq struct {first *sudog // 双向链表 头last  *sudog // 双向链表 尾
}

这个结构体有两个函数 enqueuq 和dequeue 插入链表和删除链表 就是双向链表的基础操作

hchan的结构丑图如下:
在这里插入图片描述
接下来我们按照 执行流程来梳理下部分源码 源码位置在 runtime/chan.go中,首先是 make(chan int32,1)函数,我们都知道 make函数可以初始化,map,slice和chan。编译时根据不同类型,会调用makechan,makeslcie和makemap函数。我们来看下 makechan函数。

makechan

其源码如下:
请结合比较丑流程图来看,比较好理解。


func makechan(t *chantype, size int) *hchan {elem := t.Elem// compiler checks this but be safe.if elem.Size_ >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {throw("makechan: bad alignment")}// 计算内存mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.// 如果chan是空的 则需要申请hchanSize空间 以满足 内存对齐要求c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.PtrBytes == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// 分配内存// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}// 初始化hchanc.elemsize = uint16(elem.Size_)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")}return c
}

其实 就是 根据 chan 中元素类型和 大小 来计算需要的存储空间。逻辑还是比较清晰的。初始化了空间后,接下来就应该向chan存数据了,上例中是c <- 1,在编译时会转译成 chansend1函数。其源码如下:

func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}

它只是调用了 chansend 函数 且 阻塞 状态是 true(阻塞状态true代表 如果 chan 的 buf满员,则写协程放入sendq,如果 chan 为空,则读协程会放入 recvq。 如果阻塞状态是 false 如果chan buf,满员 则返回错误。如果chan 为空,则读协程读取chan返回错误。阻塞状态为false 主要是select函数用)。接下来我们来看下其源码:

chansend

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 省略部分代码// chan没关闭 且 缓存buf已满 且 是非阻塞模式(select方法使用)  则不会写入 sendq里 直接返回错误if !block && c.closed == 0 && full(c) {return false}// todovar t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 加锁lock(&c.lock)// 如果chan已经关闭 直接panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 有阻塞的取协程 送的数据 直接交给取协程 并将取协程唤醒if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 如果没有阻塞的获取协程 且 channel容量没满 则需要插入channel中if c.qcount < c.dataqsiz {// 获取要放的位置的指针qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}// 将元素 放入其中typedmemmove(c.elemtype, qp, ep)c.sendx++// ring buffer 循环数组 到尾 就 从头开始if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++// 解锁unlock(&c.lock)return true}// buf满了 但是没有 阻塞 则直接返回失败if !block {unlock(&c.lock)return false}// 如果channel满了 则所有send协程就需要加入 sendq 里排队,创建一个 等待状态的 sudog 包装当前 协程和数据 ep 放入 sendq中// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 初始化 sudogmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 放入 发送等待队列c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.gp.parkingOnChan.Store(true)// 调用GMP模型 阻塞协程 并释放 chan的 lock 锁,释放后别的协程可以进来gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.// ep保持活着 不被垃圾回收KeepAlive(ep)// 从等待队列唤醒 // someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nil// 释放sudogreleaseSudog(mysg)if closed {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}

其执行主流程如下:
首先加锁

  1. 如果recvq里有数据,则将recvq链表头节点中的sudog拿出来,将send的数据直接交给sudog的recv协程,然后唤醒这个协程,最后释放当前send协程拥有的锁,返回。
  2. 否则 如果 hchan buf没满,则将数据存入其中,更新 qcount的值,释放锁,返回。
  3. 否则 初始化一个sudog并将 当前 send协程放入其中,并阻塞(调用GMP模型),然后释放当前send协程拥有的锁(别的send协程可以执行 chansend)
  4. 当前阻塞的send协程被待被recv协程唤醒。
  5. 唤醒后 将 当前协程状态变为非等待,释放当前协程对应的sudog 返回

这里有注意的点,sendq里的协程只能被 recv协程唤醒,反之亦然。这里就带来一个有趣的问题,sendq和recvq能都有数据吗?大神们可以思考下。

send讲完了,接下来改recv了,要不不就阻塞了吗,
例子中的 < - c 是 编译时会转译成 chanrecv1,我们来看下源码:

func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}

其block参数也是 true ,这阻塞跟 send一样。我们来看下 chanrecv方法吧

chanrecv

其源码如下:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 非主逻辑代码 跳过lock(&c.lock)// 如果chan已关闭 且qcount==0 则 证明chan中已没有数据 返回if c.closed != 0 {if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channel's buffer have data.} else {// 如果chan没关闭 先看 sendq里 有没有阻塞获取sudog  如果有 取出来 直接将 数据给其中的协程 并唤醒 阻塞的读操作 并解锁// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}// 走到这里 证明 chan没关闭 且 sendq没数据 则从缓存区取数据if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}// 非阻塞(waitq不能有数据), 返回if !block {unlock(&c.lock)return false, false}// 将 协程 构造sudog 存放到 recvq 中 阻塞协程, 下面代码跟 sendq类似 // no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil// 插入 recvq 链表尾部c.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.gp.parkingOnChan.Store(true)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, success
}

其执行主流程如下:
首先加锁

  1. 如果协程已关闭 且 buf中没有数据 则 返回
  2. 如果协程没关闭 则从 sendq里取sudog数据,如果取到了 就 将 数据 传递给 sudog的send协程 当前recv协程释放锁,唤醒send协程 返回。
  3. 如果 没有从sendq里取到数据,则从 buf 里取数据,更新qcount 然后 解锁,返回
  4. 如果 buf 为空 ,则初始化一个sudog 将 当前协程放入其中,阻塞当前recv协程,释放当前recv协程拥有的锁。
  5. 等待其他send协程唤醒 当前recv协程。
  6. 唤醒后 将 当前协程状态变为非等待,释放当前协程对应的sudog 返回

send和recv后,接下来就该 close了
close( c)编译后 函数 closechan函数 我们来看下

closechan

源码如下:


func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}// 加锁lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}// 将锁状态变为1 c.closed = 1var glist gList// 释放所有 读协程 // release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 释放所有写协程 包括其数据 所以 我们从关闭的协程里读的数据 就是 buf 中的 不会有 sendq里的数据// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}unlock(&c.lock)// 将所有协程唤醒 // Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}
}

close 主要是将阻塞的 其读和写协程 携带的 元素 释放,(但是缓存buf里的数据并没释放)可以使得GC捕获,然后将阻塞的协程唤醒。这时 在 chansend函数 lock()处 阻塞的协程会panic,被goready 唤醒的协程会正常退出;在 chanrecv 函数 lock()处 阻塞的协程(或者继续执行<-c的协程)会继续从 buf 拿数据,当数据获取完后,会退出。

总结

channel 其实就是用了共享内存加锁这种机制来处理协程之间的共享数据的,这次阅读源码还是有些细节没整明白,整体理解的也不够透彻,还望大神指正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/41337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Mathematica14.0】快速从下载安装到使用

目录 1.简介 2.下载安装 下载 安装 3.一小时掌握mathematica使用 单元模式 内置函数 符号表达式 迭代器 赋值 通配符及查找替换 函数定义 匿名函数&#xff08;拉姆达表达式&#xff09; 函数映射 函数式与运算符 函数自定义选项 图形可视化 交互式界面 数值…

Rocky Linux 9.4基于官方源码制作openssh 9.8p1二进制rpm包 —— 筑梦之路

2024年7月1日&#xff0c;openssh 9.8版本发布&#xff0c;主要修复了CVE-2024-6387安全漏洞。 由于centos 7的生命周期在6月30日终止&#xff0c;因此需要逐步替换到Rocky Linux&#xff0c;后续会有更多分享关于Rocky Linux的文章。 环境说明 1. 操作系统版本 cat /etc/o…

【论文阅读】-- Strscope:不规则测量的时间序列数据的多尺度可视化

Stroscope: Multi-Scale Visualization of Irregularly Measured Time-Series Data 摘要1 引言2相关工作2.1&#xff08;大型&#xff09;时间序列数据可视化2.2 事件序列数据可视化2.3 评价 3问题分析3.1 数据集3.2 场景——现状3.3 设计流程3.4 设计原理 4 涟漪图&#xff1a…

使用中国大陆镜像源安装最新版的 docker Deamon

在一个智算项目交付过程中&#xff0c;出现了新建集群中的全部 docker server V19 进程消失、仅剩 docker server 的 unix-socket 存活的现象。 为了验证是否是BD产品研发提供的产品deploy语句缺陷&#xff0c;需要在本地环境上部署一个简单的 docker Deamon 环境。尴尬的是&a…

针对某客户报表系统数据库跑批慢进行性能分析及优化

某客户报表系统数据库跑批时间过长&#xff0c;超出源主库较多&#xff0c;故对其进行了分析调优&#xff0c;目前状态如下&#xff1a; 1、业务连接的rac的scanip&#xff0c;因为负载均衡将跑批的连接连接到了多个计算节点导致节点间通讯成本较高&#xff0c;故速率缓慢&…

HexPlane: A Fast Representation for Dynamic Scenes一种动态场景的快速表示方法

Abstract 动态三维场景的建模与再现是三维视觉领域的一个具有挑战性的课题。先前的方法基于 NERF 并依赖于隐式表示这是缓慢的&#xff0c;因为它需要许多 MLP 评估&#xff0c;限制真实世界的应用程序。我们展示了动态三维场景可以明确地表示为六个平面的学习功能&#xff0c…

SQL语句(DCL)

DCL英文全称Data Control Language&#xff08;数据控制语言&#xff09;,用来管理数据库用户、控制数据库的访问权限 DCL-管理用户 create user itcastlocalhost identified by 123456 ;-- 修改用户heima的密码为1234 alter user heima% identified with mysql_native_passwo…

C++ windows下使用openvino部署yoloV8

目录 准备版本&#xff1a; 准备事项: 选择配置界面&#xff1a; 下载界面&#xff1a; ​编辑 添加VS配置&#xff1a; 准备代码&#xff1a; yolov8.h yolov8.cpp detect.cpp 如何找到并放置DLL&#xff1a; 准备版本&#xff1a; opencv 4.6.0 openvino 2024.0…

大模型成为软件和数据工程师

前言 想象一下这样一个世界&#xff1a;人工智能伙伴负责编码工作&#xff0c;让软件和数据工程师释放他们的创造天赋来应对未来的技术挑战&#xff01; 想象一下&#xff1a;你是一名软件工程师&#xff0c;埋头于堆积如山的代码中&#xff0c;淹没在无数的错误中&#xff0…

基于React和TypeScript的开源白板项目(Github项目分享)

在学习前端开发的过程中&#xff0c;有时候我们需要一些有趣的项目来提升我们的技能。今天我要给大家介绍的是一个非常酷的项目——NinjaSketch&#xff0c;这是一个用React和TypeScript构建的简易白板工具。这个项目使用了Rough.js来实现手绘风格的效果。尽管这个应用不是响应…

【UE5.3】笔记8 添加碰撞,检测碰撞

添加碰撞 打开BP_Food,添加Box Collision组件&#xff0c;与unity类似&#xff1a; 调整Box Collision的大小到刚好包裹物体&#xff0c;通过调整缩放和盒体范围来控制大小&#xff0c;一般先调整缩放找个大概大小&#xff0c;然后调整盒体范围进行微调。 碰撞检测 添加好碰撞…

基于jeecgboot-vue3的Flowable流程-集成仿钉钉流程(二)增加基本的发起人审批与多用户多实例

因为这个项目license问题无法开源&#xff0c;更多技术支持与服务请加入我的知识星球。 1、AssigneeNode 增加approvalText public abstract class AssigneeNode extends Node {// 审批对象private AssigneeTypeEnum assigneeType;// 表单内人员private String formUser;// 表…

Python从0到100(三十七):数据提取的概念和数据分类

1. 爬虫中数据的分类 在爬虫开发过程中,我们会遇到多种类型的数据。了解这些数据的类型对于有效地提取和解析信息至关重要。 结构化数据 结构化数据是指具有固定格式和模式的数据,常见的结构化数据格式包括JSON和XML。 处理方式:可以直接转换为Python的字典或列表等数据类…

SCI一区级 | Matlab实现BO-Transformer-LSTM多特征分类预测/故障诊断

SCI一区级 | Matlab实现BO-Transformer-LSTM多特征分类预测/故障诊断 目录 SCI一区级 | Matlab实现BO-Transformer-LSTM多特征分类预测/故障诊断效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.【SCI一区级】Matlab实现BO-Transformer-LSTM特征分类预测/故障诊断&…

C语言 | Leetcode C语言题解之第214题最短回文串

题目&#xff1a; 题解&#xff1a; char* shortestPalindrome(char* s) {int n strlen(s);int fail[n 1];memset(fail, -1, sizeof(fail));for (int i 1; i < n; i) {int j fail[i - 1];while (j ! -1 && s[j 1] ! s[i]) {j fail[j];}if (s[j 1] s[i]) {f…

HTML【详解】超链接 a 标签的四大功能(页面跳转、页内滚动【锚点】、页面刷新、文件下载)

超链接 a 标签主要有以下功能&#xff1a; 跳转到其他页面 <a href"https://www.baidu.com/" target"_blank" >百度</a>href&#xff1a;目标页面的 url 地址或同网站的其他页面地址&#xff0c;如 detail.htmltarget&#xff1a;打开目标页面…

PLL和CDR的内部结构及其区别

比较PLL和CDR的内部结构及其区别&#xff1a; 基本结构&#xff1a; PLL&#xff08;相位锁定环&#xff09;&#xff1a; 相位检测器环路滤波器压控振荡器&#xff08;VCO&#xff09;分频器&#xff08;可选&#xff0c;用于频率合成&#xff09; CDR&#xff08;时钟数据恢复…

windows电脑网络重置后wifi列表消失怎么办?

我们的电脑网络偶尔会出现异常&#xff0c;我们通常会下意识选择网络诊断&#xff0c;运行完诊断后一般会让我们选择重置网络&#xff0c;然而&#xff0c;重置后wifi列表突然消失&#xff0c;无法愉快地上网了&#xff0c;找了一圈&#xff0c;都说是更改适配器选项&#xff0…

4、SSD主控

简述 主控是个片上系统&#xff0c;由硬件和固件组成一个功能完整的系统&#xff1b;上文所述的FTL就属于主控的固件范畴。主控闪存构成了整个SSD&#xff0c;在闪存确定的情况下&#xff0c;主控就反映了各家SSD的差异。实时上各家SSD的差异也主要反应在主控上&#xff0c;毕…

小学英语语法

目录 a和an的用法名词的单复数be动词和人称代词&#xff08;主格&#xff09;指示代词形容词物主代词名词所有格双重所有格方位介词some&#xff0c;any和no的用法How many和How much的用法情态动词can的用法祈使句人称代词&#xff08;宾格&#xff09;常见实义动词的用法一般…