文章目录
- 3.7.1 用作计数信号量
- 3.7.2 使用缓存channel+sync.WaitGroup限制并发数(类似上小节)
要限制住goroutine的并发,
一定要阻塞住main的goroutine!
一定要阻塞住main的goroutine!
一定要阻塞住main的goroutine!
可以看最后一个例子。
由于带缓冲channel的运行时层实现带有缓冲区,因此对带缓冲channel的发送操作在缓冲区未满、接收操作在缓冲区非空的情况下是异步的(发送或接收无须阻塞等待)。
默认创建的都是非缓冲channel,读写都是即时阻塞。缓冲channel自带一块缓冲区,可以暂时存储数据,如果缓冲区满了,就会发生阻塞。
- 对一个带缓冲channel,在缓冲区无数据或有数据但未满的情况下,对其进行发送操作的goroutine不会阻塞;
- 在缓冲区已满的情况下,**对其进行发送操作的goroutine会阻塞;**在缓冲区为空的情况下,对其进行接收操作的goroutine亦会阻塞。
记住缓冲区channel 阻塞的是接收或者发送操作的goroutine
下面通过案例对比缓冲channel与非缓冲channel.
package mainimport ("fmt""time"
)func main() {//1.非缓冲通道ch1 := make(chan int)fmt.Println("非缓冲通道", len(ch1), cap(ch1)) //非缓冲通道 0 0go func() {data := <-ch1fmt.Println("获得数据", data) //获得数据 100}()ch1 <- 100time.Sleep(time.Second)fmt.Println("赋值ok", "main over...")//2.非缓冲通道ch2 := make(chan string)go sendData(ch2)for data := range ch2 {fmt.Println("\t 读取数据", data)}fmt.Println("main over...ch2")//3. 缓冲通道,缓冲区满了才会阻塞ch3 := make(chan string, 6)go sendData(ch3)for data := range ch3 {fmt.Println("ch3 \t读取数据", data)}fmt.Println("main over...ch3")
}func sendData(ch chan string) {for i := 1; i <= 3; i++ {ch <- fmt.Sprintf("data%d", i)fmt.Println("往通道放数据:", i)}defer close(ch)
}
输出:
$ go run .\main.go
非缓冲通道 0 0
获得数据 100
赋值ok main over...
往通道放数据: 1读取数据 data1读取数据 data2
往通道放数据: 2
往通道放数据: 3读取数据 data3
main over...ch2
往通道放数据: 1
往通道放数据: 2
往通道放数据: 3
ch3 读取数据 data1
ch3 读取数据 data2
ch3 读取数据 data3
main over...ch3
非缓冲channel部分的打印结果是输入数据和接收数据交替的,这说明读写都是即时阻塞。缓冲channel部分的输入数据打印完毕以后才打印接收数据,这意味着缓冲区没有满的情况下是非阻塞的。
可以使用缓冲channel模拟生产者和消费者。
无论是单收单发还是多收多发,带缓冲channel的收发性能都要好于无缓冲channel的;对于带缓冲channel而言,选择适当容量会在一定程度上提升收发性能。
3.7.1 用作计数信号量
Go并发设计的一个惯用法是将带缓冲channel用作计数信号量(counting semaphore)。
带缓冲channel中的当前数据个数代表的是当前同时处于活动状态(处理业务)的goroutine的数量,而带缓冲channel的容量(capacity)代表允许同时处于活动状态的goroutine的最大数量。一个发往带缓冲channel的发送操作表示获取一个信号量槽位,而一个来自带缓冲channel的接收操作则表示释放一个信号量槽位。
下面是一个将带缓冲channel用作计数信号量的例子:
// chapter6/sources/go-channel-case-7.go
var active = make(chan struct{}, 3)
var jobs = make(chan int, 10)func main() {go func() {for i := 0; i < 8; i++ {jobs <- (i + 1)}close(jobs)}()var wg sync.WaitGroupfor j := range jobs {wg.Add(1)go func(j int) {active <- struct{}{}log.Printf("handle job: %d\n", j)time.Sleep(2 * time.Second)<-activewg.Done()}(j)}wg.Wait()
}
上面的示例创建了一组goroutine来处理job,同一时间最多允许3个goroutine处于活动状态。为达成这一目标,示例使用了一个容量为3的带缓冲channel,active作为计数信号量,这意味着允许同时处于活动状态的最大goroutine数量为3。我们运行一下该示例:
$go run go-channel-case-7.go
2020/02/04 09:57:02 handle job: 8
2020/02/04 09:57:02 handle job: 4
2020/02/04 09:57:02 handle job: 1
2020/02/04 09:57:04 handle job: 2
2020/02/04 09:57:04 handle job: 3
2020/02/04 09:57:04 handle job: 7
2020/02/04 09:57:06 handle job: 6
2020/02/04 09:57:06 handle job: 5
由示例运行结果中的时间戳可以看到:虽然创建了很多goroutine,但由于计数信号量的存在,同一时间处理活动状态(正在处理job)的goroutine最多为3个。
3.7.2 使用缓存channel+sync.WaitGroup限制并发数(类似上小节)
package mainimport ("fmt""runtime""sync""time"
)func main() {run([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, 3)
}func run(dataList []int, limit int) {var wg sync.WaitGroupch := make(chan struct{}, limit)for index, ele := range dataList {wg.Add(1)fmt.Printf("%+v\n", &wg)go func(ele int, index int) {fmt.Println("for index:", index)fmt.Println("nub:", runtime.NumGoroutine())ch <- struct{}{}fmt.Println("start task:", ele)time.Sleep(10 * time.Second)fmt.Println("end task:", ele)<-chwg.Done()}(ele, index)}wg.Wait()fmt.Println("111111111111111111")
}
输出:
C:\Users\Administrator\Desktop\doc\shell\docker和k8s\项目\gotnet>go run tests/main.go
&{noCopy:{} state:{_:{} _:{} v:4294967296} sema:0}
&{noCopy:{} state:{_:{} _:{} v:8589934592} sema:0}
&{noCopy:{} state:{_:{} _:{} v:12884901888} sema:0}
&{noCopy:{} state:{_:{} _:{} v:17179869184} sema:0}
&{noCopy:{} state:{_:{} _:{} v:21474836480} sema:0}
for index: 0
nub: 6
start task: 1
for index: 3
nub: 6
start task: 4
for index: 2
nub: 6
start task: 3
for index: 4
nub: 6
&{noCopy:{} state:{_:{} _:{} v:25769803776} sema:0}
for index: 1
&{noCopy:{} state:{_:{} _:{} v:30064771072} sema:0}
&{noCopy:{} state:{_:{} _:{} v:34359738368} sema:0}
nub: 7
for index: 5
nub: 9
&{noCopy:{} state:{_:{} _:{} v:38654705664} sema:0}
for index: 7
nub: 10
&{noCopy:{} state:{_:{} _:{} v:42949672960} sema:0}
for index: 8
nub: 11
&{noCopy:{} state:{_:{} _:{} v:47244640256} sema:0}
for index: 9
nub: 12
&{noCopy:{} state:{_:{} _:{} v:51539607552} sema:0}
&{noCopy:{} state:{_:{} _:{} v:55834574848} sema:0}
&{noCopy:{} state:{_:{} _:{} v:60129542144} sema:0}
for index: 13
nub: 15
for index: 6
nub: 15
for index: 10
nub: 15
for index: 12
nub: 15
for index: 11
nub: 15
end task: 3
start task: 5
end task: 1
start task: 2
end task: 4
start task: 6
end task: 2
start task: 8
end task: 6
start task: 9
end task: 5
start task: 10
end task: 10
start task: 14
end task: 9
start task: 7
end task: 8
start task: 11
end task: 11
start task: 13
end task: 7
end task: 14
start task: 12
end task: 12
end task: 13
111111111111111111
3个一组,然后最后还剩2个一组。所有完成,就结束了主进程。
但是过程可能和一般人想的不太一样,庆看如下
-
0-10秒只出现如下信息: 可以看出来,for循环一开始就执行完了,创建了14个goroutine, 加一个本身main的goroutine. 14+1
C:\Users\Administrator\Desktop\doc\shell\docker和k8s\项目\gotnet>go run tests/main.go &{noCopy:{} state:{_:{} _:{} v:4294967296} sema:0} &{noCopy:{} state:{_:{} _:{} v:8589934592} sema:0} &{noCopy:{} state:{_:{} _:{} v:12884901888} sema:0} &{noCopy:{} state:{_:{} _:{} v:17179869184} sema:0} &{noCopy:{} state:{_:{} _:{} v:21474836480} sema:0} for index: 0 nub: 6 start task: 1 for index: 3 nub: 6 start task: 4 for index: 2 nub: 6 start task: 3 for index: 4 nub: 6 &{noCopy:{} state:{_:{} _:{} v:25769803776} sema:0} for index: 1 &{noCopy:{} state:{_:{} _:{} v:30064771072} sema:0} &{noCopy:{} state:{_:{} _:{} v:34359738368} sema:0} nub: 7 for index: 5 nub: 9 &{noCopy:{} state:{_:{} _:{} v:38654705664} sema:0} for index: 7 nub: 10 &{noCopy:{} state:{_:{} _:{} v:42949672960} sema:0} for index: 8 nub: 11 &{noCopy:{} state:{_:{} _:{} v:47244640256} sema:0} for index: 9 nub: 12 &{noCopy:{} state:{_:{} _:{} v:51539607552} sema:0} &{noCopy:{} state:{_:{} _:{} v:55834574848} sema:0} &{noCopy:{} state:{_:{} _:{} v:60129542144} sema:0} for index: 13 nub: 15 for index: 6 nub: 15 for index: 10 nub: 15 for index: 12 nub: 15 for index: 11 nub: 15
创建了这么多goroutine,但是因为channel的容量只有3,其他想要发送channel的goroutine阻塞了。
那要如何限制goroutine的数量呢?关键点,除了缓冲的channel以外,要使main的goroutine也阻塞,这时候,需要将channel的发送和接收分离。
package mainimport ("fmt""runtime""sync""time"
)func main() {run([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, 3)}func run(dataList []int, limit int) {var wg sync.WaitGroupch := make(chan struct{}, limit)for index, ele := range dataList {wg.Add(1)ch <- struct{}{} //关键代码,这行不要放在go func()里, 此时因为ch满了,main也阻塞了,不会继续for循环创建goroutine了。fmt.Printf("%+v\n", &wg)go func(ele int, index int) {fmt.Println("for index:", index)fmt.Println("nub:", runtime.NumGoroutine())fmt.Println("start task:", ele)time.Sleep(10 * time.Second)fmt.Println("end task:", ele)<-chwg.Done()}(ele, index)}wg.Wait()fmt.Println("111111111111111111")
}
关键点在于要阻塞住main,所以必须main参与了发送或者接受。利用缓冲channel的特性。
缓冲channel自带一块缓冲区,可以暂时存储数据,如果缓冲区满了,就会发生阻塞。
- 对一个带缓冲channel,在缓冲区无数据或有数据但未满的情况下,对其进行发送操作的goroutine不会阻塞;
- 在缓冲区已满的情况下,**对其进行发送操作的goroutine会阻塞;**在缓冲区为空的情况下,对其进行接收操作的goroutine亦会阻塞。