WaitGroup并发控制原理及底层源码实现
1.1实现原理
1.2底层源码
type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers only guarantee that 64-bit fields are 32-bit aligned.// For this reason on 32 bit architectures we need to check in state()// if state1 is aligned or not, and dynamically "swap" the field order if// needed.state1 uint64state2 uint32
}
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyif delta < 0 {// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}//把delta左移32位累加到state,即累加到counter中state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32)//右移32位,获取counterw := uint32(state) //获取waiterif race.Enabled && delta > 0 && v == int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(semap))}//经过累加后counter值变为负,panicif v < 0 {panic("sync: negative WaitGroup counter")}if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}//思考counter累加后为正,或者counter为零为何退出//counter为正,说明不用去释放信号量,直接退出//waiter为零,说明没有等待者,也不需要释放信号量,直接退出if v > 0 || w == 0 {return}// This goroutine has set counter to 0 when waiters > 0.// Now there can't be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter == 0.// Still do a cheap sanity check to detect WaitGroup misuse.if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.*statep = 0for ; w != 0; w-- {//释放信号量,执行一次释放一个,唤醒一个等待者runtime_Semrelease(semap, false, 0)}
}
func (wg *WaitGroup) Done() {//counter减1wg.Add(-1)
}func (wg *WaitGroup) Wait() {//获取statep和semaphone指针地址statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyrace.Disable()}for {state := atomic.LoadUint64(statep)//获取statev := int32(state >> 32)//获取counter值值w := uint32(state)//获取wainter值//如果counter值为0,说明所有goroutine都退面了,不需要等待,直接返回if v == 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// 使用CAS(比较交换算法)累加waiter,累加可能会失败,失败后通过for loop 下次重试if atomic.CompareAndSwapUint64(statep, state, state+1) {if race.Enabled && w == 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(semap))}//累加成功后,等待信号量唤醒自已runtime_Semacquire(semap)if *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}}
}
CAS算法确保,同时多个goroutine执行累加waiter不会出现问题
package mainimport ("fmt""sync/atomic"
)func main() {testCompareAndSwap1()testCompareAndSwap2()
}
func testCompareAndSwap1() {//为uint64赋值var (i uint64 = 1)//调用CompareAndSwapInt64 methodSwap := atomic.CompareAndSwapUint64(&i, 1, 2)//如果发生交换为false,显示falsefmt.Println(Swap)fmt.Println("The new value os i is:", i)
}func testCompareAndSwap2() {var (i uint64)//交换操作,这里的值变为2var oldvalue = atomic.SwapUint64(&i, 2)//打印新旧值fmt.Println("Swapped_value:", i, ",old_value", oldvalue)//调用CompareAndSwapInt64 methodSwap := atomic.CompareAndSwapUint64(&i, 1, 3)fmt.Println(Swap)fmt.Println("The new value os i is:", i)
}