Go并发通信——Channel
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。(DO NOT COMMUNICATE BY SHARING MEMORY; INSTEAD, SHARE MEMORY BY COMMUNICATING.)
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
通道Channel
声明 | var 变量 chan 元素类型 | make(chan 元素类型, [缓冲大小]) —为引用类型声明内存地址 |
---|
1. channel—注意点
- 向缓冲区已满的通道写入数据会导致goroutine阻塞
- 通道中没有数据,读取该通道会导致阻塞(阻塞即goroutine等待,等到缓冲区不满/有数据继续运行,如果主线程main进入等待,则会直接报死锁错误)
- 读取已关闭的通道不会引发阻塞,而是返回通道元素类型的零值
- 向已关闭的通道写数据会导致panic,重复关闭通道也会引发panic
2. channel—基础操作
ch <- 10 //10存入通道
x := <- ch //通道中取出第一个值赋给x
<- ch //取出值,忽略结果
close(ch) //关闭通道( 通道会被垃圾回收机制回收,不一定非要关闭 )
//**************************************************************************************
select{ //类似于switch,哪个条件满足就会执行哪个,多个满足则随机执行一个,没有满足则阻塞等待case <-ch1:...case data := <-ch2:...case ch3<-data:...default:默认操作
}//select实现通知退出机制
func dosth(done chan struct{}) chan int {ch := make(chan int)go func() {Label:for {select {case ch <- rand.Int():case <-done: 接收到done信号停止运行break Label}}close(ch)}()return ch
}
func main() {done := make(chan struct{})ch := dosth(done)fmt.Println(<-ch)fmt.Println(<-ch)done <- struct{}{}fmt.Println(<-ch)
}//**************************************************************************************
for v := range in { //for range 进行通道读取,只有当in关闭时,才会终止循环out <- v + 1
}
close(out)
3. channel—无缓冲通道 VS 有缓冲通道
(1)无缓冲通道
无缓冲通道 | ch := make(chan int) |
---|
//对于无缓冲通道,必须有进程在等待接收参数,才可以向通道中传入参数。比如下面的方法,如果把a()的执行放在ch<-10后面,就会引发死锁错误。
func a(ch chan int) {fmt.Printf("我是a,接收到了数字:%d\n", <-ch)
}func main() {ch := make(chan int)go a(ch)ch <- 10fmt.Print("over")}//可以用来实现goroutine同步
func main() {ch := make(chan struct{})go func(){time.Sleep(3*time.second)ch<-struct{}}()<-chfmt.Print("over")
}
(2)有缓冲通道
有缓冲通道 | make(chan int, 1) //创建一个容量为1的有缓冲区通道 |
---|---|
获取通道元素数量 | len(channel) |
获取通道容量 | cap(channel) |
(3)单向通道(在赋值或者传参时,我们将其设为单向通道,使得该函数只能对其进行读/写一个操作)
func squarer(out chan<- int, in <-chan int) {for i := range in {out <- i * i}close(out)
}
4. channel—用例
(1)实现管道
//一个函数的输入参数和输出参数是相同的chan类型,则函数可以调用自己,形成一个调用链
func chain(in chan int) chan int { //作用是将输入的通道值加一out := make(chan int)go func() {for v := range in { //由于in,out声明的都是无缓冲,所以每读一个都会阻塞,等下一个参数out <- v + 1}close(out)}()return out
}func Guandao() {in := make(chan int)go func() {for i := 0; i < 10; i++ { //初始化输入参数in <- i}close(in)}()out := chain(chain(chain(in))) //叠加形成管道for v := range out { //从最终的输出通道读出结果fmt.Println(v)}
}
(2)实现每个事务一个goroutine
//通过设置任务通道,每当有新任务来临时,为其分配相应的goroutine进行执行
type task struct {num int
}func (t *task) job(wait sync.WaitGroup) {fmt.Println("i am", t.num, "WORKING")time.Sleep(3 * time.Second) //模拟工作流程fmt.Println("i am", t.num, "DONE")wait.Done() //工作完成,wait-1
}func AllocateJob() {wait := sync.WaitGroup{} //wait防止程序未执行完成就退出taskchan := make(chan *task, 10) //接收任务通道go func() {for i := 1; i <= 10; i++ { //模拟有新工作加入,1秒加入一个task := &task{num: i}wait.Add(1) //wait+1time.Sleep(1 * time.Second)taskchan <- task //任务加入任务通道}}()go func() {for task := range taskchan { //从通道中取出任务,并分配goroutine执行go task.job(wait)}}()wait.Wait()
}
(3)实现future模式
适用于一个流程中需要调用多个子调用的情况,并且这些子调用之间没有依赖。future模式的优势就是将其同步调用转化为异步调用。
//通过通道,实现在执行一个较为耗时的任务时,先通过goroutine调用该任务,由chan传入参数后,并行做其他的事,最后再通过通道获取其运行的结果。
type query struct {in chan stringout chan string
}func doquery(q query) {in := <-q.intime.Sleep(5 * time.Second) //模拟访问数据库进行查询q.out <- in + "的结果"}func Futuretest() {temp := query{in: make(chan string), out: make(chan string)}go doquery(temp)temp.in <- "select * from user"time.Sleep(3 * time.Second) //模拟在进行数据库查询时,我们可以做一些其他的事情fmt.Println(<-temp.out) //做完其他的事情,并且数据库查询也完成了,我们从通道中获取查询结果
}