并发访问slice
线上出现一粒多协程并发append全局slice的情况,导致内存不断翻倍,因此对slice的使用需要重新考虑。
并发读写的情况下, 可以利用锁、channel等避免竞态
问题
func TestDemo32(t *testing.T) {var wg sync.WaitGroupvar n = 100s := make([]int, 0, 200)hdr := (*reflect.SliceHeader)(unsafe.Pointer(&s))fmt.Printf("Data addr: %d\n", hdr.Data)fmt.Printf("Data cap: %d\n", hdr.Cap)fmt.Printf("Data len: %d\n", hdr.Len)wg.Add(n)for i := 1; i <= n; i++ {go func(v int) {defer wg.Done()s = append(s, v)}(i)}wg.Wait()hdr = (*reflect.SliceHeader)(unsafe.Pointer(&s))fmt.Printf("Data addr: %d\n", hdr.Data)fmt.Printf("Data cap: %d\n", hdr.Cap)fmt.Printf("Data len: %d\n", hdr.Len)fmt.Println(jsonx.ToString(s))// Data addr: 824645965056// Data cap: 200// Data len: 0// Data addr: 824645965056// Data cap: 200// Data len: 96
}func TestDemo33(t *testing.T) {var wg sync.WaitGroupvar n = 500s := make([]int, 0, 10)hdr := (*reflect.SliceHeader)(unsafe.Pointer(&s))fmt.Printf("Data addr: %d\n", hdr.Data)fmt.Printf("Data cap: %d\n", hdr.Cap)fmt.Printf("Data len: %d\n", hdr.Len)wg.Add(n)for i := 1; i <= n; i++ {go func(v int) {defer wg.Done()s = append(s, v)}(i)}wg.Wait()hdr = (*reflect.SliceHeader)(unsafe.Pointer(&s))fmt.Printf("Data addr: %d\n", hdr.Data)fmt.Printf("Data cap: %d\n", hdr.Cap)fmt.Printf("Data len: %d\n", hdr.Len)fmt.Println(jsonx.ToString(s))// Data addr: 824635459136// Data cap: 10// Data len: 0// Data addr: 824665328128// Data cap: 672// Data len: 453
}
Go语言中的slice是一种引用类型,它本身不保存任何元素,只是对数组的引用。
append操作的主要功能是向slice添加元素。
如果slice的底层数组容量足够,就直接添加;如果不够,就会先分配新的底层数组,然后将原有的元素和新添加的元素一起拷贝到新数组,在这个过程中,原有的底层数组会被垃圾回收。
当底层数组的容量不足以容纳新的元素时,就会产生新的底层数组
此时原有的slice和返回的新的slice,其底层数组是不同的。
这也是为什么在使用append时,总是习惯性地将结果再次赋值给原slice。
方案一 channel
func TestDemo(t *testing.T) {// 无缓冲,发送侧有数据,接收侧才执行// 用于做同步c := make(chan struct{})// new 了该 job 后,该 job 就开始准备从 channel 接收数据s := NewScheduleJob(n, func() { c <- struct{}{} })// 并发发送数据到channelvar wg sync.WaitGroupvar n = 1000wg.Add(n)for i := 0; i < n; i++ {go func(v int) {defer wg.Done()s.AddData(v)}(i)}// 等待上述多个协程将数据存入slicewg.Wait()// 发送完之后关闭channels.Close()// 阻塞在这里是等待NewScheduleJob执行结束<-c// 最终实现读写一致fmt.Println(len(s.data))
}type ServiceData struct {ch chan int // 用来同步的channeldata []int // 存储数据的slice
}// Schedule 从 channel 接收数据串行存入slice,直到ch关闭
func (s *ServiceData) Schedule() {for i := range s.ch {s.data = append(s.data, i)}
}// Close 关闭channel
func (s *ServiceData) Close() {close(s.ch)
}// AddData 发送数据到 channel
func (s *ServiceData) AddData(v int) {s.ch <- v
}func NewScheduleJob(size int, done func()) *ServiceData {s := &ServiceData{ch: make(chan int, size),data: make([]int, 0),}go func() {// 并发地 append 数据到 slice// Schedule 从 channel 接收数据串行存入slice,直到ch关闭s.Schedule()// 通知主协程继续执行done()}()return s
}
优点:逻辑复杂,适用于高并发场景
方案二 - 锁
func main() {slc := make([]int, 0, 1000)var wg sync.WaitGroupvar lock sync.Mutexfor i := 0; i < 1000; i++ {wg.Add(1)go func(a int) {defer wg.Done()lock.Lock()defer lock.Unlock()slc = append(slc, a)}(i)}wg.Wait()fmt.Println(len(slc))
}
优点:锁的逻辑重,适用于对性能要求不高的场景
学习文档
https://juejin.cn/post/6844904134592692231