我们使用 WaitGroup 来解决当前协程分组同步的问题,它很好地实现了任务同步,但是却无法返回错误。当一组协程中的某一个协程出错时,我们是无法感知到的。为了解决这个问题,官方提供了 errgroup(golang.org/x/sync/errgroup)包,它是对WaitGroup 进行了封装,支持返回协程遇到的第一个错误。
在errgroup包下定义了一个Group 结构体,它就是我们要介绍的ErrGroup并发原语,定义如下:
// Group 等待组结构体定义
type Group struct {
cancel func() //用于控制遇到err后不再执行后续函数
wg sync.WaitGroup //等待所有g执行完毕
errOnce sync.Once //保证只记录第一个err
err error //记录遇到的第一个err
}
在使用ErrGroup时,要用到三个方法,分别是WithContext、Go和Wait。
// WithContext 根据传入的ctx返回Group和新的ctx
//可以简单理解成是一个 New 方法,返回 Group 结构体指针,用于调用 Go 和 Wait 方法,
//这里需要注意,虽然第二个参数返回 ctx,但这里已经不是我们传进去的 ctx 了,是通过 WithCancel,方法生成了全新的 ctx,所以大多数场景我们并不需要接受了第二个返回参数。
//不过有些场景可以通过返回的 ctx.Done() 来监听每一个 goroutine 结束。
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
// Go 启动一个新g执行给定方法
//执行我们自定义的方法,但必须返回 err,这里我们发现通过 sync.Once.Do 方法,用来保证只将第一个遇到的 err 记录并返回。
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
// Wait 等待所有g执行完毕,或在某个g执行失败返回第一个err
//通过 WaitGroup 等待所有 goroutine 结束,并返回遇到的第一个 err。
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
使用示例:
func main() {
eg, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 100; i++ {
i := i
eg.Go(func() error {
time.Sleep(2 * time.Second)
select {
case <-ctx.Done():
fmt.Println("Canceled:", i)
return nil
default:
fmt.Println("End:", i)
return nil
}})}
if err := eg.Wait(); err != nil {
log.Fatal(err)
}
}