go协程工具WaitGroup

go协程工具WaitGroup

waitGroup 是go语言非常常用的一个协程的流程工具,可以在指定位置阻塞等待指定数量的协程执行完成,从字面意思理解即是阻塞等待指定组的协程执行完成,在需要控制或者等待一批协程执行完成时可以使用waitGroup 实现该效果,其实该包的原理也是使用了channel实现的协程同步机制,通过在指定位置添加waitGroup.Wait() 方法来阻塞等待指定数量的协程执行完成,在协程执行完成后调用waitGroup.Done() 方法来通知waitGroup 协程执行完成,waitGroup 内部维护了一个计数器,当计数器为0时,waitGroup.Wait() 方法才会返回,否则会阻塞等待。

type WaitGroup struct {
}

func (wg *WaitGroup) Add(delta int)

func (wg *WaitGroup) Done()

func (wg *WaitGroup) Wait()
  • Add(delta int) 每次激活想要被等待完成的协程之前,先调用Add(),用来设置或添加要等待完成的goroutine数量
  • Done() 表示该协程执行完成,执行Done()会将waitGroup 内部的计数器减1
  • Wait() 表示等待所有的协程执行完成,当waitGroup 内部的计数器为0时,Wait() 方法才会返回,否则会阻塞等待 一般在所有的协程执行完成后调用Wait() 方法,等待所有的协程执行完成
func (wg *WaitGroup) Add(delta int) {
    // 先从 state 当中把数据和信号量取出来
    statep, semap := wg.state()

    // 在 waiter 上加上 delta 值
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    // 取出当前的 counter
    v := int32(state >> 32)
    // 取出当前的 waiter,正在等待 goroutine 数量
    w := uint32(state)

    // counter 不能为负数
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }

    // 这里属于防御性编程
    // w != 0 说明现在已经有 goroutine 在等待中,说明已经调用了 Wait() 方法
    // 这时候 delta > 0 && v == int32(delta) 说明在调用了 Wait() 方法之后又想加入新的等待者
    // 这种操作是不允许的
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 如果当前没有人在等待就直接返回,并且 counter > 0
    if v > 0 || w == 0 {
        return
    }

    // 这里也是防御 主要避免并发调用 add 和 wait
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }

    // 唤醒所有 waiter,看到这里就回答了上面的问题了
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}
func (wg *WaitGroup) Wait() {
    // 先从 state 当中把数据和信号量的地址取出来
    statep, semap := wg.state()

    for {
        // 这里去除 counter 和 waiter 的数据
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)

        // counter = 0 说明没有在等的,直接返回就行
        if v == 0 {
            // Counter is 0, no need to wait.
            return
        }

        // waiter + 1,调用一次就多一个等待者,然后休眠当前 goroutine 等待被唤醒
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap)
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

使用场景

使用场景,现在有一个接口 需要调用3个三方接口做数据整合和拼接处理,每个三方接口调用需要1s, 如果是没有协程的场景,需要顺序阻塞的请求3个接口,调用3次http请求,将数据获取到后拼接组合返回, 如果使用waitGroup 则可以启动3个协程同时获取三个接口数据,阻塞等待3个协程完成并结束,后拼接组合返回。理论时间效率比之前的顺序执行快N倍

代码实现

func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    go func() {
        defer wg.Done()
        // 调用三方接口1
    }()
    go func() {
        defer wg.Done()
        // 调用三方接口2
    }()
    go func() {
        defer wg.Done()
        // 调用三方接口3
    }()
    wg.Wait()
    // 所有三方接口调用完成后,继续处理结果
}

注意

  • 注意wg.add的数量和wg.done数量要一致,否则会造成死锁(互相等待)
  • 注意waitGroup 不能交叉使用,
  • 注意waitGroup 不能在协程中使用,只能在主协程中使用

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top