Goroutine:
goroutine是Go并行设计的核心。goroutine说到底其实就是协程,它比线程更小,十几个goroutine可能体现在底层就是五六个线程,
Go语言内部帮你实现了这些goroutine之间的内存共享
。执行goroutine只需极少的栈内存(大概是4~5KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine比thread更易用、更高效、更轻便。一般情况下,一个普通计算机跑几十个线程就有点负载过大了,但是同样的机器却可以轻松地让成百上千个goroutine进行资源竞争。
特点:
- 有独立的栈空间
- 共享程序堆内存
- 调度由用于控制
- 协程是轻量级的线程
Goroutine的创建:
只需在函数调⽤语句前添加 go 关键字
,就可创建并发执⾏单元。开发⼈员无需了解任何执⾏细节,调度器会自动将其安排到合适的系统线程上执行。在并发编程中,我们通常想将一个过程切分成几块,然后让每个goroutine各自负责一块工作,当一个程序启动时,主函数在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。而go语言的并发设计,让我们很轻松就可以达成这一目的。
Goroutine格式:
go 函数名( 参数列表 )
演示:
主goroutine退出后,其它的工作goroutine也会自动退出:
func main() {// 如果不加go执行顺序是:先执行test1再执行test2,是有顺序的,但是如果有go关键字就是同时在执行了go goroutineTest01()go goroutineTest02()for {}
}func goroutineTest01() {for i := 0; i < 10; i++ {fmt.Println("goroutineTest01执行")time.Sleep(1000 * time.Millisecond)}
}func goroutineTest02() {for i := 0; i < 10; i++ {fmt.Println("goroutineTest02执行")time.Sleep(1000 * time.Millisecond)}
}
runtime包:
runtime.Gosched()
用于让出CPU时间片,让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次再获得cpu时间轮片的时候,从该出让cpu的位置恢复执行。有点像跑接力赛,A跑了一会碰到代码runtime.Gosched() 就把接力棒交给B了,A歇着了,B继续跑。
func main() {go func() {for {fmt.Println("我不让出时间片")}}()for {runtime.Gosched() // 让出当前时间片fmt.Println("我让出时间片")}
}
runtime.Goexit()
将立即终止当前 goroutine 执⾏,调度器确保所有已注册 defer延迟调用被执行。Goexit之前注册的defer会生效,之后不会。
func main() {go func() { // Goexit直接退出funcfmt.Println("走我吗——1")goexit()fmt.Println("走我吗——2")}()for {}
}func goexit() {//returnfmt.Println("走我吗——3")runtime.Goexit() // 退出当前go程defer fmt.Println("走我吗——4")
}
runtime.GOMAXPROCS()
用来设置可以并行计算的CPU核数的最大值,并返回上一次的核心数,如果是第一次调用就返回默认值。
func main() {// func GOMAXPROCS(n int) int {} 参数是要设置的核心数,返回值是上一次设置的核心数 num := runtime.GOMAXPROCS(1)fmt.Println("上一次设置核心数为:", num)for {// 0和1会一直交替打印,如果用GOMAXPROCS限制1个核心,那么谁抢到谁就一直跑go fmt.Println(0)fmt.Println(1)}
}
channel:
channel可以建立goroutine之间的通信连接,channel的特点是:先进先出、线程安全不需要加锁
,channel是Go语言中的一个核心类型,可以把它看成管道。并发核心单元通过它就可以发送或者接收数据通讯,这在一定程度上又进一步降低了编程的难度。channel是一个数据类型,主要用来解决协程的同步问题以及协程之间数据共享(数据传递)的问题。- goroutine运行在相同的地址空间,因此访问共享内存必须做好同步。
goroutine 奉行通过通信来共享内存,而不是共享内存来通信。
引⽤类型 channel可用于多个 goroutine 通讯。其内部实现了同步,确保并发安全。- channel分为有缓冲和无缓冲
channel分为两个端:
传入端负责写的操作,输出端负责读的操作
读和写必须同时满足条件,才在会进行数据流动,否则会阻塞。
例:channel就是一个外卖小哥,传入端是卖家,输出端是买家,必须保证卖家把商品给外卖小哥以后,买家正在准备拿,否则外卖小哥就会懵逼了。
无缓冲的channel:
无缓冲的通道(unbuffered channel)指在接收前不会保存任何数据的一个通道。通道容量为0,可以实现同步的操作,操作前提是读和写必须同时操作,否则会阻塞。
- 缓冲:中间加了个存放数据的区域,然后缓存区慢了才会写入,就像Java的缓冲流一样
- 这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。否则,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。
- 这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
阻塞:由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足,才解除阻塞。
同步:在两个或多个协程(线程)间,保持数据内容一致性的机制。
无缓冲创建:
chan
是创建channel所需使用的关键字。Type
代表指定channel收发数据的类型。
make(chan Type) //等价于make(chan Type, 0)make(chan Type, capacity)
当参数capacity= 0 时,channel 是无缓冲阻塞读写的
当capacity > 0 时,channel 有缓冲、是非阻塞的,直到写满 capacity个元素才阻塞写入。
channel通过操作符<-
来发送和接收数据
发送和接收数据语法:
默认情况下,channel接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得goroutine同步变的更加的简单,而不需要显式的lock。
channel <- value //发送value到channel<-channel //接收并将其丢弃x := <-channel //从channel中接收数据,并赋值给xx, ok := <-channel //功能同上,同时检查通道是否已关闭或者是否为空
演示:
func main() {go channelTest01()go channelTest02()for {}
}// 定义channel
var channel = make(chan int)// 定义一个公共操作类
func print(s string) {for _, ch := range s {fmt.Printf("%c", ch)time.Sleep(300 * time.Millisecond)}
}// 定义两个人使用打印机
func channelTest01() {print("person01")channel <- 1 // person01负责写的操作,随便写的数字都行,相当于规定了两个方法的执行顺序
}
func channelTest02() {<-channel // person02负责读channel中的数据,也就是先把person01的数据读出来才会继续执行person02的任务//num := <-channel // person02负责读channel中的数据,也就是先把person01的数据读出来才会继续执行person02的任务// 如果一个写,一个没读,或者是一个读一个没写就会阻塞print("person02")//fmt.Println(num) //也可以定义一个变量存起来
}
演示:
func main() {// 创建无缓冲通道,长度默认为0ch := make(chan string)// 验证长度和容量 len(ch):channel中数据剩余未读取的个数 cap(ch):channel的容量fmt.Println("channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))// 定义匿名go程go func() {for i := 0; i < 3; i++ {fmt.Println("匿名go程循环:", i, "channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))}ch <- "匿名go程执行完毕"}()// 主函数读取channel中的数据result := <-chfmt.Println("result", result)
}
演示:
func main() {ch := make(chan int)go func() {for i := 0; i < 3; i++ {fmt.Println("匿名channel在写:", i)ch <- i}}()// time.Sleep(300 * time.Millisecond)for i := 0; i < 3; i++ {result := <-chfmt.Println("main函数channel在读:", result)}
}
打印结果:
可以看到结果并不是一个写一个读的情况,原因是这样的
- Main函数先执行,然后通过make创建了channel
- 下面是两个方法,因为没有指定先后顺序,所以具体执行哪一个是靠抢占线程的
- 然后匿名channel抢到执行权,写了个i=0,然后写到channel里面
- 然后又要抢下一次执行权,还是匿名的channel抢到,又写了个i=1
- 然后main的抢到了,写了两次0、1
- 然后下面一人一次
总结就是不管是有缓冲还是无缓冲,都要调用硬件写到屏幕,所以到底谁先谁后,是看谁能抢到CPU的执行权,只要涉及到IO的操作都是有延迟的,打印的数据出现顺序问题都是正常的,
匿名channel在写: 0匿名channel在写: 1main函数channel在读: 0main函数channel在读: 1匿名channel在写: 2main函数channel在读: 2
有缓冲的channel:
有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个数据值的通道。
- 这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也不同。
无缓冲的是同步操作,有缓冲是异步操作
- 只有通道中没有要接收的值时,接收动作才会阻塞。
- 只有通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
- 这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。
有缓冲的channel创建格式:
如果给定了一个缓冲区容量,通道就是异步的。只要缓冲区有未使用空间用于发送数据,或还包含可以接收的数据,那么其通信就会无阻塞地进行
make(chan Type, capacity)
演示:
有可能会出现
func main() {ch := make(chan int, 3)fmt.Println("初始数据:", "channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))go func() {for i := 0; i < 3; i++ {fmt.Println("匿名go程循环:", i, "channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))ch <- i}}()time.Sleep(300 * time.Millisecond)for i := 0; i < 3; i++ {result := <-chfmt.Println("main函数channel在读:", result, "channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))}
}
关闭channel:
如果发送者知道,没有更多的值需要发送到channel的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,因为接收者可以停止不必要的接收等待。这可以通过内置的close函数来关闭channel实现。
- channel不像文件一样需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束range循环之类的,才去关闭channel
- 关闭channel后,无法向channel 再发送数据(引发 panic 错误后导致接收立即返回零值)
- 闭channel后,可以继续从channel接收数据
- 对于nil channel,无论收发都会被阻塞。
- 如果不知道发送端要发多少次可以使用 range 来迭代channel
判断channel是否关闭的两种方式
if num , ok := <- ch; ok == true{}for num := range ch {}
- 如果已经关闭,ok为false,num无数据
- 如果没有关闭,ok为true,num保存读取的数据
- golang中还是引用了管道的特性,关闭后就会返回0,但是不需要需判断是否为0,而是用ok判断
func main() {ch := make(chan int, 3)go func() {for i := 0; i < 3; i++ {ch <- i}// 写入端关闭channelclose(ch)}()/*for {if num, ok := <-ch; ok == true {fmt.Println("读到的数据:", num)} else {fmt.Println("关闭后:", num)break}}*/for num := range ch {fmt.Println("读到的数据:", num)}
}
单向channel:
默认情况下,通道channel是双向的,也就是,既可以往里面发送数据也可以往里面接收数据。但是,我们经常见一个通道作为参数进行传递而值希望对方是单向使用的,要么只让它发送数据,要么只让它接收数据,这时候我们可以指定通道的方向。
单向channel变量声明:
var ch1 chan int // ch1是一个正常的channel,是双向的
var ch2 chan<- float64 // ch2是单向channel,只用于写float64数据
var ch3 <-chan int // ch3是单向channel,只用于读int数据
- chan<- 表示数据进入管道,要把数据写进管道,对于调用者就是输出。
- <-chan 表示数据从管道出来,对于调用者就是得到管道的数据,当然就是输入。
- 双向channel可以隐式转换为任意一种单向channel,单向channel不可以转换为双向channel
channel作为函数参数:
channel传参是引用,好处就是多个Goroutine通信的时候会共用一个channel
演示:
func main() {ch := make(chan int, 3)go func() {send(ch) // 相当于双向转为单向写的操作}()read(ch)
}// 读
func read(in <-chan int) {n := <-infmt.Println("读到:", n)
}// 写
func send(out chan<- int) {out <- 24close(out)
}
生产者消费者模型:
- 单向channel最典型的应用是
生产者消费者模型
生产者消费者模型
: 某个模块(函数等)负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、协程、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。- 单单抽象出生产者和消费者,还够不上是生产者/消费者模型。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。
例:小明负责蒸包子,小红负责吃包子,桌子用来放包子
- 小明蒸好包子——相当于生产者制造数据
- 把包子放在桌子上——相当于生产者把数据放入缓冲区
- 小红从桌子上拿走包子——相当于消费者把数据取出缓冲区
- 小红吃包子——相当于消费者处理数据
这个缓冲区有什么用呢?为什么不让生产者直接调用消费者的某个函数,直接把数据传递过去,而画蛇添足般的设置一个缓冲区呢?
1、解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会直接影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合度也就相应降低了。
2、处理并发
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者只能无端浪费时间。使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实最当初这个生产者消费者模式,主要就是用来处理并发问题的。
3、缓存
如果生产者制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
演示:
type OrderInfo struct {orderId int // 订单id
}func product(out chan<- OrderInfo) {for i := 0; i < 10; i++ {order := OrderInfo{orderId: i + 1}out <- order}close(out)
}func consumer(in <-chan OrderInfo) {for order := range in {fmt.Println("订单id为:", order.orderId)}
}func main() {ch := make(chan OrderInfo)go product(ch)consumer(ch)
}
- 在上面的代码中,加了一个消费者,同时在consumer方法中,将数据取出来后,又进行了一组运算。这时可能会出现一个协程从管道中取出数据,参与加法运算,但是还没有算完另外一个协程又从管道中取出一个数据赋值给了num变量。所以这样累加计算,很有可能出现问题。当然,按照前面的知识,解决这个问题的方法很简单,就是通过加锁的方式来解决。增加生产者也是一样的道理。
- 另外一个问题,如果消费者比生产者多,仓库中就会出现没有数据的情况。我们需要不断的通过循环来判断仓库队列中是否有数据,这样会造成cpu的浪费。反之,如果生产者比较多,仓库很容易满,满了就不能继续添加数据,也需要循环判断仓库满这一事件,同样也会造成CPU的浪费。
- 我们希望当仓库满时,生产者停止生产,等待消费者消费;同理,如果仓库空了,我们希望消费者停下来等待生产者生产。为了达到这个目的,这里引入条件变量。(需要注意:如果仓库队列用channel,是不存在以上情况的,因为channel被填满后就阻塞了,或者channel中没有数据也会阻塞)。
条件变量:
- 条件变量的作用并不保证在同一时刻仅有一个协程(线程)访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时,通知阻塞在某个条件上的协程(线程)。条件变量不是锁,在并发中不能达到同步的目的,因此条件变量总是与锁一块使用。
- 例如,上面说的,如果仓库队列满了,我们可以使用条件变量让生产者对应的goroutine暂停(阻塞),但是当消费者消费了某个产品后,仓库就不再满了,应该唤醒(发送通知给)阻塞的生产者goroutine继续生产产品。
GO标准库中的sys.Cond类型代表了条件变量。条件变量要与锁(互斥锁,或者读写锁)一起使用。成员变量L代表与条件变量搭配使用的锁。
对应的有3个常用方法,Wait、Signal、Broadcast
func (c *Cond) Wait()
该函数的作用可归纳为如下三点:
- 阻塞等待条件变量满足
- 释放已掌握的互斥锁相当于cond.L.Unlock()。 注意:两步为一个原子操作。
- 当被唤醒,Wait()函数返回时,解除阻塞并重新获取互斥锁。相当于cond.L.Lock()
func (c *Cond) Signal()
- 单发通知,给一个正等待(阻塞)在该条件变量上的goroutine(线程)发送通知。
func (c *Cond) Broadcast()
- 广播通知,给正在等待(阻塞)在该条件变量上的所有goroutine(线程)发送通知。
演示:
var cond sync.Cond // 创建全局条件变量// 生产者
func producer(out chan<- int, idx int) {for {cond.L.Lock() // 条件变量对应互斥锁加锁for len(out) == 3 { // 产品区满 等待消费者消费cond.Wait() // 挂起当前协程, 等待条件变量满足,被消费者唤醒}num := rand.Intn(1000) // 产生一个随机数out <- num // 写入到 channel 中 (生产)fmt.Printf("%dth 生产者,产生数据 %3d, 公共区剩余%d个数据\n", idx, num, len(out))cond.L.Unlock() // 生产结束,解锁互斥锁cond.Signal() // 唤醒 阻塞的 消费者time.Sleep(time.Second) // 生产完休息一会,给其他协程执行机会}
}//消费者
func consumer(in <-chan int, idx int) {for {cond.L.Lock() // 条件变量对应互斥锁加锁(与生产者是同一个)for len(in) == 0 { // 产品区为空 等待生产者生产cond.Wait() // 挂起当前协程, 等待条件变量满足,被生产者唤醒}num := <-in // 将 channel 中的数据读走 (消费)fmt.Printf("---- %dth 消费者, 消费数据 %3d,公共区剩余%d个数据\n", idx, num, len(in))cond.L.Unlock() // 消费结束,解锁互斥锁cond.Signal() // 唤醒 阻塞的 生产者time.Sleep(time.Millisecond * 500) //消费完 休息一会,给其他协程执行机会}
}
func main() {rand.Seed(time.Now().UnixNano()) // 设置随机数种子quit := make(chan bool) // 创建用于结束通信的 channelproduct := make(chan int, 3) // 产品区(公共区)使用channel 模拟cond.L = new(sync.Mutex) // 创建互斥锁和条件变量for i := 0; i < 5; i++ { // 5个消费者go producer(product, i+1)}for i := 0; i < 3; i++ { // 3个生产者go consumer(product, i+1)}<-quit // 主协程阻塞 不结束
}/*
1. main函数中定义quit,其作用是让主协程阻塞。
2. 定义product作为队列,生产者产生数据保存至队列中,最多存储3个数据,消费者从中取出数据模拟消费
3. 条件变量要与锁一起使用,这里定义全局条件变量cond,它有一个属性:L Locker。是一个互斥锁。
4. 开启5个消费者协程,开启3个生产者协程。
5. producer生产者,在该方法中开启互斥锁,保证数据完整性。并且判断队列是否满,如果已满,调用wait()让该goroutine阻塞。当消费者取出数后执行cond.Signal(),会唤醒该goroutine,继续生产数据。
6. consumer消费者,同样开启互斥锁,保证数据完整性。判断队列是否为空,如果为空,调用wait()使得当前goroutine阻塞。当生产者产生数据并添加到队列,执行cond.Signal() 唤醒该goroutine。
*/
定时器:
ime.Timer是一个定时器。代表未来的一个单一事件,你可以告诉timer你要等待多长时间。它提供一个channel,在定时时间到达之前,没有数据写入timer.C会一直阻塞。直到定时时间到,向channel写入值,阻塞解除,可以从中读取数据。
创建:
func main() {// 三种方法完成定时、NewTimer、After// 当前时间fmt.Printf("当前时间:%v\n", time.Now())// 创建定时器,2秒后,定时器向定时器的C发送time.Timer类型的元素值timer := time.NewTimer(time.Second * 2)nowTimer := <-timer.Cfmt.Println("nowTimer", nowTimer)fmt.Printf("当前时间:%v\n", time.Now())after := <-time.After(time.Second * 2)fmt.Println("after", after)timeNow02 := <-timer.Cfmt.Printf("timeNow02:%v\n", timeNow02) // 当前时间// time.Sleeptimer2 := time.NewTimer(time.Second * 2)<-timer2.Cfmt.Println("可以实现单纯的等待2秒")time.Sleep(time.Second * 2)fmt.Println("再一次2s后")
}
重置和关闭:
func main() {// 定时停止和重置timer3 := time.NewTimer(time.Second * 3)go func() {<-timer3.Cfmt.Println("timer3运行完毕")}()stop := timer3.Stop() // 设置定时器停止if stop {fmt.Println("已经停止")}timer4 := time.NewTimer(time.Second * 3) // 原设置时间timer4.Reset(time.Second * 1) // 重新设置时间<-timer4.Cfmt.Println("after")
}
定时器周期定时:
func main() {quit := make(chan bool)i := 0fmt.Println("当前时间:", time.Now())// NewTicker:周期定时器ticker := time.NewTicker(time.Second)go func() {for {i++nowTime := <-ticker.Cfmt.Println("nowTime", nowTime)if i == 5 {quit <- true}}}()<-quit
}
select:
通过select可以监听channel上的数据流动
select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。与switch语句相比, select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作(读写),大致的结构如下:
select {case <-chan1:// 如果chan1成功读到数据,则进行该case处理语句case chan2 <- 1:// 如果成功向chan2写入数据,则进行该case处理语句default:// 如果上面都没有成功,则进入default处理流程}
在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有两种可能的情况:
- 如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。
- 如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去,但是会产生忙轮询
演示:
func main() {ch := make(chan int) // 用来进行数据通信的channelquit := make(chan bool) // 用来判断是否退出的channelgo func() {for i := 0; i < 5; i++ {ch <- itime.Sleep(time.Second)}close(ch)quit <- true // 通知主go程退出runtime.Goexit()}()// 监听channel,读取数据for {select {case num := <-ch:fmt.Println("读到的数据为:", num)case <-quit:return}}
}
斐波那契数列:
func main() {ch := make(chan int) // 用来进行数据通信的channelquit := make(chan bool) // 用来判断是否退出的channelgo f(ch, quit)x, y := 1, 1for i := 0; i < 20; i++ {ch <- xx, y = y, x+y}quit <- true
}func f(ch <-chan int, quit <-chan bool) {for {select {case num := <-ch:fmt.Print(num, " ")case <-quit://returnruntime.Goexit()}}
}
超时:
有时候会出现goroutine阻塞的情况,使用select我们如何避免整个程序进入阻塞的情况
演示:
func main() {ch := make(chan int)quit := make(chan bool)go func() {for {select {case v := <-ch:fmt.Println(v)// 设置5秒读取不到数据就退出,避免阻塞case <-time.After(5 * time.Second):fmt.Println("timeout")quit <- truebreak}}}()ch <- 666 // 写完5秒后退出<-ch
}