并发编程
我们主流的并发编程思路一般有:多进程、多线程
但这两种方式都需要操作系统介入,进入内核态,是十分大的时间开销
由此而来,一个解决该需求的技术出现了:用户级线程,也叫做 绿程、轻量级线程、协程
python - asyncio、java - netty22111111111111115
由于 go 语言是 web2.0 时代发展起来的语言,go语言没有多线程和多进程的写法,其只有协程的写法 golang - goroutine
func Print() {fmt.Println("打印印")
}func main() {go Print()
}
我们可以使用这种方式来进行并发编程,但这个程序里要注意,我们主程序在确定完异步之后结束,会立即让程序退出,这就导致我们并发的子线程没来得及执行就退出了。
我们可以增加一个Sleep来让主线程让出资源,等待子线程执行完毕再进行操作
func Print() {for {time.Sleep(time.Second)fmt.Println("打印印")}}func main() {go Print()for {time.Sleep(time.Second)fmt.Println("主线程")}
}
另外的,Go 语言协程的一个巨大优势是 可以打开成百上千个协程,协助程序效率的提升
要注意一个问题:
多进程的切换十分浪费时间,且及其浪费系统资源
多线程的切换也很浪费时间,但其解决了浪费系统资源的问题
协程既解决了切换浪费时间的问题,也解决了浪费系统资源的问题
Go语言仅支持协程
Go语言中,协程的调度(gmp机制):
用户新建的协程(Goroutine)会被加入到同一调度器中等待运行,若调度器满,则会将调度器中一半的 G 加入到全局队列中,其他 P 若没有 G 则会从其他 P 中偷取一半的 G ,若所有的都满,则会新建 M 进行处理
P 的数量是固定的
注意 M 和 P 不是永远绑定的,当一个 P 现在绑定的 M 进入了阻塞等情况,P 会自动去寻找空闲的 M 或创建新的 M 来绑定
子 goroutine 如何通知到主 goroutine 其运行状态?也就是我们主协程要知道子协程运行完毕之后再进行进一步操作,也就是 (wait)
func main() {// 定义 sync.Group 类型的变量用于控制goroutine的状态var wg sync.WaitGroupfor i := 0; i < 100; i++ {go func(i int) {wg.Add(1) // 每次启动一个协程都要开启一个计数器defer wg.Done() // 每次结束之前都要让计数器 -1fmt.Println("这次打印了:" + strconv.Itoa(i))}(i)}wg.Wait()fmt.Println("结束..................................")
}
锁
Go语言中的锁
互斥锁
我们看下面这个程序:
var total int
var wg sync.WaitGroupfunc add() {defer wg.Done()for i := 0; i <= 100000; i++ {total += 1}
}func sub() {defer wg.Done()for i := 0; i <= 100000; i++ {total -= 1}
}func main() {wg.Add(2)go add()go sub()wg.Wait()fmt.Println(total)/*-10000110000168595...*/
}
我们发现这个程序的结果不可预知,这是因为 a 的操作分为三步:
取得 a 的值、执行 a 的计算操作、写入 a 的值,这三步不是原子性的,如果发生了交叉,则一个数准备写入时发生协程切换,这时后面再做再多的操作,也会被这次切换屏蔽掉,最终写入这一个数的结果,由此可知,这种非原子性操作共享数据的模式是不可预知结果的。
那么,我们就需要加锁,像下面这样
var total int
var wg sync.WaitGroup
var lock sync.Mutex
var lockc = &lockfunc add() {defer wg.Done()for i := 0; i <= 100000; i++ {lock.Lock() // 加锁,直至见到自己这把锁的 Unlock() 方法之前,令这中间的方法都为原子性的total += 1lock.Unlock() // 解锁,配合 Lock() 方法使用}
}func sub() {defer wg.Done()for i := 0; i <= 100000; i++ {lockc.Lock()total -= 1lockc.Unlock()}
}func main() {wg.Add(2)go add()go sub()wg.Wait()fmt.Println(total)fmt.Printf("%p\n", &lock)fmt.Printf("%p\n", &(*lockc))
}
注意,上面这个程序不仅演示了加锁,还演示了,浅拷贝不影响加锁的情况
另外,我们也可以使用 automic 对简单的数值计算进行加锁
var total int32
var wg sync.WaitGroup
var lock sync.Mutex
var lockc = &lockfunc add() {defer wg.Done()for i := 0; i <= 100000; i++ {atomic.AddInt32(&total, 1)}
}func sub() {defer wg.Done()for i := 0; i <= 100000; i++ {atomic.AddInt32(&total, -1)}
}func main() {wg.Add(2)go add()go sub()wg.Wait()fmt.Println(total)
}
读写锁
读写锁就是:允许同时读,不允许同时写,不允许同时读写
func main() {var num intvar rwlock sync.RWMutex // 定义一个读写锁var wg sync.WaitGroup // 定义等待处理器wg.Add(2)go func() {defer wg.Done()rwlock.Lock() // 写锁defer rwlock.Unlock()num = 12}()// 同步处理器,这里是简便处理time.Sleep(1)go func() {defer wg.Done()rwlock.RLock()defer rwlock.RUnlock()fmt.Println(num)}()wg.Wait()}
一个简单的测试
func main() {var rwlock sync.RWMutex // 定义一个读写锁var wg sync.WaitGroup // 定义等待处理器wg.Add(6)go func() {time.Sleep(time.Second)defer fmt.Println("释放写锁,可以进行读操作")defer wg.Done()rwlock.Lock() // 写锁defer rwlock.Unlock()fmt.Println("得到写锁,停止读操作")time.Sleep(time.Second * 5)}()for i := 0; i < 5; i++ {go func() {defer wg.Done()for {rwlock.RLock()fmt.Println("得到读锁,进行读操作")time.Sleep(time.Millisecond * 500)rwlock.RUnlock()}}()}wg.Wait()/**得到读锁,进行读操作得到写锁,停止读操作释放写锁,可以进行读操作得到读锁,进行读操作得到读锁,进行读操作得到读锁,进行读操作*/}
通信
Go 语言中对于并发场景下的通信,秉持以下理念:
不要通过共享内存来通信,要通过通信实现共享内存
其他语言都是用一个共享的变量来实现通信,或者消息队列,Go语言就希望实现队列的机制
var msg chan string //定义一个用于传递 string 的 channel// 创建一个缓冲区大小为 1 的 channel// 只有 有缓冲区的 channel 才可以暂存数据msg = make(chan string, 1)msg <- "data"data := <-msgfmt.Println(data)
只有 goroutine 中才可以使用缓冲区大小为 0 的channel
func main() {var msg chan string //定义一个用于传递 string 的 channel// 创建一个缓冲区大小为 1 的 channel// 只有 有缓冲区的 channel 才可以暂存数据msg = make(chan string, 0)go func(msg chan string) {data := <-msgfmt.Println(data)}(msg)msg <- "data"time.Sleep(time.Second * 3)
}
这时由于 go 语言 channel 中的 happen-before 机制,该机制保证了 就算先 receiver 也会被 goroutine 挂起,等待 sender 完成之后再进行 receiver 的具体执行
go 语言中,channel 的应用场景十分广泛,包括:
- 信息传递、消息过滤
- 信号广播
- 事件订阅与广播
- 任务分发
- 结果汇总
- 并发控制
- 同步异步
Go 语言的消息接收问题
func main() {var msg chan string //定义一个用于传递 string 的 channel// 创建一个缓冲区大小为 1 的 channel// 只有 有缓冲区的 channel 才可以暂存数据msg = make(chan string, 0)go func(msg chan string) {// 注意这里,每一个接收消息的变量只能接收到一个消息,若有多条消息同时发送,则无法接收data := <-msgfmt.Println(data)math := <-msgfmt.Println(math)}(msg)msg <- "data"msg <- "math"time.Sleep(time.Second * 3)}
如果我们不知道消息会发送来多少,可以使用 for-range 进行监听:
func main() {var msg chan string //定义一个用于传递 string 的 channel// 创建一个缓冲区大小为 1 的 channel// 只有 有缓冲区的 channel 才可以暂存数据msg = make(chan string, 2)go func(msg chan string) {// 若我们不确定有多少消息会过来,我们可以使用 for-range 进行循环验证for data := range msg {fmt.Println(data)}}(msg)msg <- "data"msg <- "math"time.Sleep(time.Second * 3)}
close(msg) // 关闭队列,监听队列的 goroutine 会立刻退出
关闭了的 channel 不能再存储数据,但可以进行数据的取出操作
上面我们所接触的 channel 都是双向的 channel 即这个channel 对应的goroutine 既可以从里面读数据,也可以向里面写数据,这种不符合我们程序,一个程序只做它对应的一个功能,这一程序设计思路
创建单向 channel:
var ch1 chan int // 这是一个双向 channelvar ch2 chan<- float64 // 这是一个只能写入 float64 类型数据的单向 channelvar ch3 <-chan int // 这是一个只能从 存储int型 channel中读取数据的单向channel
c := make(chan int, 3) // 创建一个双向 channelvar send chan<- int = c // 将 c channel 的写入能力赋予给 send,使其成为一个单向发送的 channel (生产者)var receive <-chan int = c // 将c channel 的读取能力赋予给receive,使其成为一个单向接收的 channel (消费者)
经典例子:
/**
经典:
使用两个 goroutine 交替打印:12AB34CD56EF78GH910IJ1112.....YZ2728
*/var number, letter = make(chan bool), make(chan bool)func printNum() {// 这里是等待接收消息,若消息接收不到,则该协程会始终阻塞在这个位置i := 1for {<-numberfmt.Printf("%d%d", i, i+1)i += 2letter <- true}
}func printLetter() {i := 0str := "ABCDEFGHIJKLMNOPQRSTUVWXYZ"for {<-letterfmt.Print(str[i : i+2])number <- trueif i <= 23 {i += 2} else {return}}}func main() {go printNum()go printLetter()number <- truetime.Sleep(time.Second * 20)
}
使用 select 对 goroutine 进行监控
// 使用 struct{} 作为传入的信息,由于这个 struct{} 占用内存空间较小,这是一种常见的传递方式func g1(ch chan struct{}) {time.Sleep(time.Second * 3)ch <- struct{}{}
}func g2(ch chan struct{}) {time.Sleep(time.Second * 3)ch <- struct{}{}
}func main() {g1Channel := make(chan struct{})g2Channel := make(chan struct{})go g1(g1Channel)go g2(g2Channel)// 注意这里只要有一个能取到值则 select 结果则结束select {// 若 g1Channel 中能取到值case <-g1Channel:fmt.Println("g1 done")// 若 g1Channel 中能取到值case <-g2Channel:fmt.Println("g2 done")}
}
这里若所有的 goroutine 都就绪了,则 select 执行哪个是随机的,为的是防止某个 goroutine 一直被优先执行导致的另一个 goroutine 饥饿
超时机制:
// 使用 struct{} 作为传入的信息,由于这个 struct{} 占用内存空间较小,这是一种常见的传递方式func g1(ch chan struct{}) {time.Sleep(time.Second * 3)ch <- struct{}{}
}func g2(ch chan struct{}) {time.Sleep(time.Second * 3)ch <- struct{}{}
}func main() {g1Channel := make(chan struct{})g2Channel := make(chan struct{})go g1(g1Channel)go g2(g2Channel)timeChannel := time.NewTimer(5 * time.Second)for {// 注意这里只要有一个能取到值则 select 结果则结束select {// 若 g1Channel 中能取到值case <-g1Channel:fmt.Println("g1 done")// 若 g1Channel 中能取到值case <-g2Channel:fmt.Println("g2 done")case <-timeChannel.C: // timeChannel.C 是获取我们创建的 channel 的方法fmt.Println("time out")return}}
}
context
使用 WithCancel() 引入手动终止进程的功能
func cpuIInfo(ctx context.Context) {defer wg.Done()for {select {case <-ctx.Done(): // 本质还是一个 channelfmt.Println("程序退出执行...........")returndefault:time.Sleep(1 * time.Second)fmt.Println("CPUUUUUUUUUUUUUU")}}
}func main() {/**这里有一个问题,我们可以将以 context.Background() 为参数的 context 是作为最上层的父 context所有以其他 context 为参数的 context 都是他的子 context只要父 context 调用了 cancel() 则其所有的子 context 都会停止*/ctxParent, cancel := context.WithCancel(context.Background())ctxChild, _ := context.WithCancel(ctxParent)wg.Add(1)go cpuIInfo(ctxChild)time.Sleep(5 * time.Second)cancel() // 这个方法会直接向context channel 中传入一个对象,令channel停止wg.Wait()
}
使用 WthTimeout() 来自动引入超时退出机制
var wg sync.WaitGroupfunc cpuIInfo(ctx context.Context) {defer wg.Done()for {select {case <-ctx.Done(): // 本质还是一个 channelfmt.Println("程序退出执行...........")returndefault:time.Sleep(1 * time.Second)fmt.Println("CPUUUUUUUUUUUUUU")}}
}func main() {/**这里有一个问题,我们可以将以 context.Background() 为参数的 context 是作为最上层的父 context所有以其他 context 为参数的 context 都是他的子 context只要父 context 调用了 cancel() 则其所有的子 context 都会停止*/ctxParent, _ := context.WithTimeout(context.Background(), 6*time.Second)ctxChild, _ := context.WithCancel(ctxParent)wg.Add(1)go cpuIInfo(ctxChild)wg.Wait()
}
WithDeadline() 是指定某个时间点,在某个时间点的时候进行执行
WithValue() 则会向 context 中传递一个数据,我们可以在子 goroutine 中调用这个数据
var wg sync.WaitGroupfunc cpuIInfo(ctx context.Context) {defer wg.Done()for {select {case <-ctx.Done(): // 本质还是一个 channelfmt.Println("程序退出执行...........")returndefault:time.Sleep(1 * time.Second)fmt.Printf("%s", ctx.Value("LeaderID"))fmt.Println("CPUUUUUUUUUUUUUU")}}
}func main() {/**这里有一个问题,我们可以将以 context.Background() 为参数的 context 是作为最上层的父 context所有以其他 context 为参数的 context 都是他的子 context只要父 context 调用了 cancel() 则其所有的子 context 都会停止*/ctxParent, _ := context.WithTimeout(context.Background(), 6*time.Second)ctxChild := context.WithValue(ctxParent, "LeaderID", "00001") // 注意 WithValue 方法只有一个返回值wg.Add(1)go cpuIInfo(ctxChild)wg.Wait()
}