简介
channel顾名思义就是channel的意思,主要用来在协程之间传递数据,所以是并发安全的。其实现原理,其实就是一个共享内存再加上锁,底层阻塞机制使用的是GMP模型。可见 GMP模型就是那个道,道生一,一生二,二生三,三生万物。
一个简单的例子 如下:
func main() {c := make(chan int32, 1)go func() {c <- 1}()go func() {fmt.Println(<-c)}()time.Sleep(2 * time.Second)close(c)
}
运行结果是:
1
其汇编部分代码如下:
CALL runtime.makechan(SB) // 对应 make(chan int32,1)...CALL runtime.chanrecv1(SB) // 对应 <-c ...CALL runtime.chansend1(SB) // 对应 c<-1 其中 <- 编译后就代表运行时 chanrecv1 函数 ...CALL runtime.closechan(SB) // 对应 close(c)
CALL runtime.closechan(SB)
协程部分汇编代码,因为不是重点,略过 感兴趣的自己可以编译看看。
chan的所有内容都存放在runtime.hchan这个结构体中,makechan,chanrecv1和chansend1函数都是操作hchan这个结构体来实现chan的功能的。下面我们来看下 hchan结构体。
两种重要结构体
hchan 结构体
hchan结构体 如下
type hchan struct {qcount uint // total data in the queue 环形队列里面的总的数据量 dataqsiz uint // size of the circular queue // 环形队列大小 就是 make chan时 申请的大小buf unsafe.Pointer // points to an array of dataqsiz elements // 指向环形队列的指针elemsize uint16 // 储存的元素类型占空间大小closed uint32 // chan 状态 1 关闭 0 未关闭elemtype *_type // element type // 元素类型 // make chan是 指定的类型 不过这个类型要进行运行时转换 但对应关系是这样的 sendx uint // send index // 发送索引recvx uint // receive index // 获取索引recvq waitq // list of recv waiters // 获取协程等待队列sendq waitq // list of send waiters // 发送携程等待队列lock mutex // 锁
}
waitq 结构体
waitq结构体用来存储阻塞在chan上的协程状态sudog结构体(内部包含了 协程信息)
其结构如下:
// 等待队列
type waitq struct {first *sudog // 双向链表 头last *sudog // 双向链表 尾
}
这个结构体有两个函数 enqueuq 和dequeue 插入链表和删除链表 就是双向链表的基础操作
hchan的结构丑图如下:
接下来我们按照 执行流程来梳理下部分源码 源码位置在 runtime/chan.go中,首先是 make(chan int32,1)函数,我们都知道 make函数可以初始化,map,slice和chan。编译时根据不同类型,会调用makechan,makeslcie和makemap函数。我们来看下 makechan函数。
makechan
其源码如下:
请结合比较丑流程图来看,比较好理解。
func makechan(t *chantype, size int) *hchan {elem := t.Elem// compiler checks this but be safe.if elem.Size_ >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {throw("makechan: bad alignment")}// 计算内存mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.// 如果chan是空的 则需要申请hchanSize空间 以满足 内存对齐要求c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.PtrBytes == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// 分配内存// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}// 初始化hchanc.elemsize = uint16(elem.Size_)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")}return c
}
其实 就是 根据 chan 中元素类型和 大小 来计算需要的存储空间。逻辑还是比较清晰的。初始化了空间后,接下来就应该向chan存数据了,上例中是c <- 1,在编译时会转译成 chansend1函数。其源码如下:
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}
它只是调用了 chansend 函数 且 阻塞 状态是 true(阻塞状态true代表 如果 chan 的 buf满员,则写协程放入sendq,如果 chan 为空,则读协程会放入 recvq。 如果阻塞状态是 false 如果chan buf,满员 则返回错误。如果chan 为空,则读协程读取chan返回错误。阻塞状态为false 主要是select函数用)。接下来我们来看下其源码:
chansend
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 省略部分代码// chan没关闭 且 缓存buf已满 且 是非阻塞模式(select方法使用) 则不会写入 sendq里 直接返回错误if !block && c.closed == 0 && full(c) {return false}// todovar t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 加锁lock(&c.lock)// 如果chan已经关闭 直接panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 有阻塞的取协程 送的数据 直接交给取协程 并将取协程唤醒if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 如果没有阻塞的获取协程 且 channel容量没满 则需要插入channel中if c.qcount < c.dataqsiz {// 获取要放的位置的指针qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}// 将元素 放入其中typedmemmove(c.elemtype, qp, ep)c.sendx++// ring buffer 循环数组 到尾 就 从头开始if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++// 解锁unlock(&c.lock)return true}// buf满了 但是没有 阻塞 则直接返回失败if !block {unlock(&c.lock)return false}// 如果channel满了 则所有send协程就需要加入 sendq 里排队,创建一个 等待状态的 sudog 包装当前 协程和数据 ep 放入 sendq中// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 初始化 sudogmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 放入 发送等待队列c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.gp.parkingOnChan.Store(true)// 调用GMP模型 阻塞协程 并释放 chan的 lock 锁,释放后别的协程可以进来gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.// ep保持活着 不被垃圾回收KeepAlive(ep)// 从等待队列唤醒 // someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nil// 释放sudogreleaseSudog(mysg)if closed {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}
其执行主流程如下:
首先加锁
- 如果recvq里有数据,则将recvq链表头节点中的sudog拿出来,将send的数据直接交给sudog的recv协程,然后唤醒这个协程,最后释放当前send协程拥有的锁,返回。
- 否则 如果 hchan buf没满,则将数据存入其中,更新 qcount的值,释放锁,返回。
- 否则 初始化一个sudog并将 当前 send协程放入其中,并阻塞(调用GMP模型),然后释放当前send协程拥有的锁(别的send协程可以执行 chansend)
- 当前阻塞的send协程被待被recv协程唤醒。
- 唤醒后 将 当前协程状态变为非等待,释放当前协程对应的sudog 返回
这里有注意的点,sendq里的协程只能被 recv协程唤醒,反之亦然。这里就带来一个有趣的问题,sendq和recvq能都有数据吗?大神们可以思考下。
send讲完了,接下来改recv了,要不不就阻塞了吗,
例子中的 < - c 是 编译时会转译成 chanrecv1,我们来看下源码:
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}
其block参数也是 true ,这阻塞跟 send一样。我们来看下 chanrecv方法吧
chanrecv
其源码如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 非主逻辑代码 跳过lock(&c.lock)// 如果chan已关闭 且qcount==0 则 证明chan中已没有数据 返回if c.closed != 0 {if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channel's buffer have data.} else {// 如果chan没关闭 先看 sendq里 有没有阻塞获取sudog 如果有 取出来 直接将 数据给其中的协程 并唤醒 阻塞的读操作 并解锁// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}// 走到这里 证明 chan没关闭 且 sendq没数据 则从缓存区取数据if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}// 非阻塞(waitq不能有数据), 返回if !block {unlock(&c.lock)return false, false}// 将 协程 构造sudog 存放到 recvq 中 阻塞协程, 下面代码跟 sendq类似 // no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil// 插入 recvq 链表尾部c.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.gp.parkingOnChan.Store(true)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, success
}
其执行主流程如下:
首先加锁
- 如果协程已关闭 且 buf中没有数据 则 返回
- 如果协程没关闭 则从 sendq里取sudog数据,如果取到了 就 将 数据 传递给 sudog的send协程 当前recv协程释放锁,唤醒send协程 返回。
- 如果 没有从sendq里取到数据,则从 buf 里取数据,更新qcount 然后 解锁,返回
- 如果 buf 为空 ,则初始化一个sudog 将 当前协程放入其中,阻塞当前recv协程,释放当前recv协程拥有的锁。
- 等待其他send协程唤醒 当前recv协程。
- 唤醒后 将 当前协程状态变为非等待,释放当前协程对应的sudog 返回
send和recv后,接下来就该 close了
close( c)编译后 函数 closechan函数 我们来看下
closechan
源码如下:
func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}// 加锁lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}// 将锁状态变为1 c.closed = 1var glist gList// 释放所有 读协程 // release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 释放所有写协程 包括其数据 所以 我们从关闭的协程里读的数据 就是 buf 中的 不会有 sendq里的数据// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}unlock(&c.lock)// 将所有协程唤醒 // Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}
}
close 主要是将阻塞的 其读和写协程 携带的 元素 释放,(但是缓存buf里的数据并没释放)可以使得GC捕获,然后将阻塞的协程唤醒。这时 在 chansend函数 lock()处 阻塞的协程会panic,被goready 唤醒的协程会正常退出;在 chanrecv 函数 lock()处 阻塞的协程(或者继续执行<-c的协程)会继续从 buf 拿数据,当数据获取完后,会退出。
总结
channel 其实就是用了共享内存加锁这种机制来处理协程之间的共享数据的,这次阅读源码还是有些细节没整明白,整体理解的也不够透彻,还望大神指正。