errGroup

我们使用 WaitGroup 来解决当前协程分组同步的问题,它很好地实现了任务同步,但是却无法返回错误。当一组协程中的某一个协程出错时,我们是无法感知到的。为了解决这个问题,官方提供了 errgroup(golang.org/x/sync/errgroup)包,它是对WaitGroup 进行了封装,支持返回协程遇到的第一个错误。

在errgroup包下定义了一个Group 结构体,它就是我们要介绍的ErrGroup并发原语,定义如下:

// Group 等待组结构体定义
type Group struct {
    cancel func() //用于控制遇到err后不再执行后续函数
    wg sync.WaitGroup //等待所有g执行完毕
    errOnce sync.Once //保证只记录第一个err
    err     error //记录遇到的第一个err
}

在使用ErrGroup时,要用到三个方法,分别是WithContext、Go和Wait。

// WithContext 根据传入的ctx返回Group和新的ctx
//可以简单理解成是一个 New 方法,返回 Group 结构体指针,用于调用 Go 和 Wait 方法,
//这里需要注意,虽然第二个参数返回 ctx,但这里已经不是我们传进去的 ctx 了,是通过 WithCancel,方法生成了全新的 ctx,所以大多数场景我们并不需要接受了第二个返回参数。
//不过有些场景可以通过返回的 ctx.Done() 来监听每一个 goroutine 结束。
func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    return &Group{cancel: cancel}, ctx
}


// Go 启动一个新g执行给定方法
//执行我们自定义的方法,但必须返回 err,这里我们发现通过 sync.Once.Do 方法,用来保证只将第一个遇到的 err 记录并返回。
func (g *Group) Go(f func() error) {
    g.wg.Add(1)

    go func() {
        defer g.wg.Done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel()
                }
            })
        }
    }()
}

// Wait 等待所有g执行完毕,或在某个g执行失败返回第一个err
//通过 WaitGroup 等待所有 goroutine 结束,并返回遇到的第一个 err。
func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel()
    }
    return g.err
}

使用示例:

func main() {
   eg, ctx := errgroup.WithContext(context.Background())
   for i := 0; i < 100; i++ {
      i := i
      eg.Go(func() error {
         time.Sleep(2 * time.Second)
         select {
         case <-ctx.Done():
            fmt.Println("Canceled:", i)
            return nil
         default:
            fmt.Println("End:", i)
            return nil
         }})}
   if err := eg.Wait(); err != nil {
      log.Fatal(err)
   }
}

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注