协程并发等待技术——WaitGroup 类型和 errgroup 包
waitgroup
阻塞等待多个并发任务执行完成。WaitGroup 类型主要包含下面几个方法。
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
第一个是 Add 方法,在任务运行之前,需要调用 Add 方法,用于设置需要等待完成的任务数,Add 方法传进去的数值之和,需要和任务数相等。
第二个是 Done 方法,每个任务完成时,需要调用 Done 方法,用于告知 WaitGroup 对象已经有一个任务运行完成。
第三个是 Wait 方法,当需要等待所有并发任务完成时,调用 Wait 方法,用于阻塞主协程。
import ("sync"
)var urls = []string{"http://www.golang.org/","http://www.google.com/","http://www.somestupidname.com/",
}func TestWaitGroup(t *testing.T) {// 创建WaitGroupwg := sync.WaitGroup{}results := make([]string, len(urls))for index, url := range urls {url := urlindex := index// 在创建协程执行任务之前,调用Add方法wg.Add(1)go func() {// 任务完成后,调用Done方法defer wg.Done()// Fetch the URL.resp, err := http.Get(url)if err != nil {return}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {return}results[index] = string(body)}()}// 主协程阻塞,等待所有的任务执行完成wg.Wait()
}
errgroup 包
可以在主协程中获取并发任务错误信息
import ("golang.org/x/sync/errgroup"
)func TestErrHandle(t *testing.T) {results := make([]string, len(urls))// 创建Group类型g := new(errgroup.Group)for index, url := range urls {// Launch a goroutine to fetch the URL.url := urlindex := index// 调用Go方法g.Go(func() error {// Fetch the URL.resp, err := http.Get(url)if err != nil {return err // 返回错误}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {return err // 返回错误}results[index] = string(body)return nil})}// Wait for all HTTP fetches to complete.// 等待所有任务执行完成,并对错误进行处理if err := g.Wait(); err != nil {fmt.Println("Failured fetched all URLs.")}
}
第一步,我们要创建 Group 类型的对象。
第二步,在 Group 的 Go 方法中传入那些需要并发运行的函数。特别需要注意的是,这些传入的函数必须将错误返回。
第三步,也是最后一步,在主协程中,我们需要调用 Group 对象的 Wait 方法。通过这一调用,主协程将会阻塞等待,直至所有通过 Go 方法传入的任务都执行完毕。并且,在任务完成后,我们还能够对 Wait 方法所返回的错误进行处理。
func TestLimitGNum(t *testing.T) {results := make([]string, len(urls))// 用WithContext函数创建Group对象eg, ctx := errgroup.WithContext(context.Background())// 调用SetLimit方法,设置可同时运行的最大协程数eg.SetLimit(2)for index, url := range urls {url := urlindex := index// 调用Go方法eg.Go(func() error {select {case <-ctx.Done(): // select-done模式取消运行return errors.New("task is cancelled")default:// 并发获取urlresp, err := http.Get(url)if err != nil {return err // 返回错误}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {return err // 返回错误}results[index] = string(body)return nil}})}// 等待所有任务执行完成,并对错误进行处理if err := eg.Wait(); err != nil {fmt.Println("Failured fetched all URLs.")}
}
errorGroup 包中的结构体
type token struct{}type Group struct {cancel func(error) // 这个作用是為了前面說的 WithContext 而來的wg sync.WaitGroup // errGroup底层的阻塞等待功能,就是通过WaitGroup实现的sem chan token // 用于控制最大运行的协程数err error // 最后在Wait方法中返回的errorerrOnce sync.Once // 用于安全的设置err
}
总结:
1. WaitGroup类型是Golang的基础并发类型,用于阻塞等待多个并发任务执行完成,包含Add、Done和Wait方法。
2. errgroup包是Golang提供的并发扩展库,对WaitGroup进行了封装,在并发等待的基础功能上提供了错误处理和任务取消功能。
3. Group类型的Go方法用于传入具有错误返回值的函数类型,Wait方法会阻塞等待所有传入Go方法的函数全部运行完毕,并且在任务完成后能够对错误进行处理。
4. 任务取消功能通过WithContext函数创建Group对象,传入Go方法的函数需要实现select-done模式,利用context来停止所有相关任务。
5. errgroup包还可以限制同时并发运行的最大协程数,通过SetLimit方法设置可同时运行的最大协程数,达到最大协程数时会阻塞创建新协程运行任务。