go协程工具WaitGroup
waitGroup 是go语言非常常用的一个协程的流程工具,可以在指定位置阻塞等待指定数量的协程执行完成,从字面意思理解即是阻塞等待指定组的协程执行完成,在需要控制或者等待一批协程执行完成时可以使用waitGroup 实现该效果,其实该包的原理也是使用了channel实现的协程同步机制,通过在指定位置添加waitGroup.Wait() 方法来阻塞等待指定数量的协程执行完成,在协程执行完成后调用waitGroup.Done() 方法来通知waitGroup 协程执行完成,waitGroup 内部维护了一个计数器,当计数器为0时,waitGroup.Wait() 方法才会返回,否则会阻塞等待。
type WaitGroup struct {
}
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
- Add(delta int) 每次激活想要被等待完成的协程之前,先调用Add(),用来设置或添加要等待完成的goroutine数量
- Done() 表示该协程执行完成,执行Done()会将waitGroup 内部的计数器减1
- Wait() 表示等待所有的协程执行完成,当waitGroup 内部的计数器为0时,Wait() 方法才会返回,否则会阻塞等待 一般在所有的协程执行完成后调用Wait() 方法,等待所有的协程执行完成
func (wg *WaitGroup) Add(delta int) {
// 先从 state 当中把数据和信号量取出来
statep, semap := wg.state()
// 在 waiter 上加上 delta 值
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 取出当前的 counter
v := int32(state >> 32)
// 取出当前的 waiter,正在等待 goroutine 数量
w := uint32(state)
// counter 不能为负数
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// 这里属于防御性编程
// w != 0 说明现在已经有 goroutine 在等待中,说明已经调用了 Wait() 方法
// 这时候 delta > 0 && v == int32(delta) 说明在调用了 Wait() 方法之后又想加入新的等待者
// 这种操作是不允许的
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 如果当前没有人在等待就直接返回,并且 counter > 0
if v > 0 || w == 0 {
return
}
// 这里也是防御 主要避免并发调用 add 和 wait
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 唤醒所有 waiter,看到这里就回答了上面的问题了
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
func (wg *WaitGroup) Wait() {
// 先从 state 当中把数据和信号量的地址取出来
statep, semap := wg.state()
for {
// 这里去除 counter 和 waiter 的数据
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// counter = 0 说明没有在等的,直接返回就行
if v == 0 {
// Counter is 0, no need to wait.
return
}
// waiter + 1,调用一次就多一个等待者,然后休眠当前 goroutine 等待被唤醒
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
使用场景
使用场景,现在有一个接口 需要调用3个三方接口做数据整合和拼接处理,每个三方接口调用需要1s, 如果是没有协程的场景,需要顺序阻塞的请求3个接口,调用3次http请求,将数据获取到后拼接组合返回, 如果使用waitGroup 则可以启动3个协程同时获取三个接口数据,阻塞等待3个协程完成并结束,后拼接组合返回。理论时间效率比之前的顺序执行快N倍
代码实现
func main() {
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
// 调用三方接口1
}()
go func() {
defer wg.Done()
// 调用三方接口2
}()
go func() {
defer wg.Done()
// 调用三方接口3
}()
wg.Wait()
// 所有三方接口调用完成后,继续处理结果
}
注意
- 注意wg.add的数量和wg.done数量要一致,否则会造成死锁(互相等待)
- 注意waitGroup 不能交叉使用,
- 注意waitGroup 不能在协程中使用,只能在主协程中使用