WaitGroup
sync.WaitGroup
可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))for _, request := range requests {go func(r *Request) {defer wg.Done()// res, err := service.call(r)}(request)
}
wg.Wait()
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers only guarantee that 64-bit fields are 32-bit aligned.// For this reason on 32 bit architectures we need to check in state()// if state1 is aligned or not, and dynamically "swap" the field order if// needed.state1 uint64state2 uint32
}
noCopy
:禁止拷贝。state
:state 是 WaitGroup 的状态数据字段,且是一个无符号64 bit的数据,内容包含了 counter , waiter 信息- counter 代表目前尚未完成的个数,WaitGroup.Add(n) 将会导致 counter += n, 而 WaitGroup.Done() 将导致 counter–。
- waiter 代表目前已调用 WaitGroup.Wait 的 goroutine 的个数。
Add
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()if race.Enabled {if delta < 0 {// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}// counter 增加数量state := wg.state.Add(uint64(delta) << 32)// 取出 counterv := int32(state >> 32)// 取出 waiterw := uint32(state)if race.Enabled && delta > 0 && v == int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(&wg.sema))}// counter < 0,panicif v < 0 {panic("sync: negative WaitGroup counter")}// delta > 0 && v == int32(delta) : 表示从 0 开始添加计数值// w!=0 :表示已经有了等待者// 说明说明在调用了 Wait() 方法之后又想加入新的等待者,这种操作是不允许的if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}if v > 0 || w == 0 {return}// This goroutine has set counter to 0 when waiters > 0.// Now there can't be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter == 0.// Still do a cheap sanity check to detect WaitGroup misuse.// 避免并发调用 Add() 和 Wait()if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.// 唤醒所有 waiter*statep = 0for ; w != 0; w-- {runtime_Semrelease(semap, false, 0)}
}
Wait
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref early race.Disable()}for {// 读取state// 高32位 ==> counter// 低32位 ==> waiterstate := atomic.LoadUint64(statep)v := int32(state >> 32)w := uint32(state)// counter减到0了,returnif v == 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.// counter 不为0,waiters++if wg.state.CompareAndSwap(state, state+1) {if race.Enabled && w == 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(semap))}// 阻塞runtime_Semacquire(semap)// 被唤醒,检查state是否等于0// 不为0,说明存在计数值未恢复为0就重用,panicif *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}}
}
Done
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}
Done
只是调用了 Add()
,将groutine - 1。
小结
sync.WaitGroup
必须在sync.WaitGroup.Wait
方法返回之后才能被重新使用。sync.WaitGroup.Done
只是对sync.WaitGroup.Add
方法的简单封装,我们可以向sync.WaitGroup.Add
方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine。- 可以同时有多个 Goroutine 等待当前 sync.WaitGroup 计数器的归零,这些 Goroutine 会被同时唤醒。