sizedwaitgroup


SizedWaitGroup has the same role and close to the same API as the Golang sync.WaitGroup
but it adds a limit on the amount of goroutines started concurrently.
//Based upon sync.WaitGroup, SizedWaitGroup allows to start multiple
//routines and to wait for their end using the simple API.
//SizedWaitGroup adds the feature of limiting the maximum number of
//concurrently started routines. It could for example be used to start
//multiples routines querying a database but without sending too much
//queries in order to not overload the given database.
//Rémy Mathieu © 2016
package sizedwaitgroup
import (
    "context"
    "math"
    "sync"
)
//SizedWaitGroup has the same role and close to the
//same API as the Golang sync.WaitGroup but adds a limit of
//the amount of goroutines started concurrently.
type SizedWaitGroup struct {
    Size int
    current chan struct{}
    wg      sync.WaitGroup
}
//New creates a SizedWaitGroup.
//The limit parameter is the maximum amount of
//goroutines which can be started concurrently.
func New(limit int) SizedWaitGroup {
    size := math.MaxInt32 //2^32 - 1
    if limit > 0 {
        size = limit
    }
    return SizedWaitGroup{
        Size: size,

        current: make(chan struct{}, size),
        wg:      sync.WaitGroup{},
    }
}
//Add increments the internal WaitGroup counter.
//It can be blocking if the limit of spawned goroutines
//has been reached. It will stop blocking when Done is been called.
//See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Add() {
    s.AddWithContext(context.Background())
}
//AddWithContext increments the internal WaitGroup counter.
//It can be blocking if the limit of spawned goroutines
//has been reached. It will stop blocking when Done is
//been called, or when the context is canceled. Returns nil on
//success or an error if the context is canceled before the lock is acquired.
//See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) AddWithContext(ctx context.Context) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case s.current <- struct{}{}:
        break
    }
    s.wg.Add(1)
    return nil
}
//Done decrements the SizedWaitGroup counter.
//See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Done() {
    <-s.current
    s.wg.Done()
}
//Wait blocks until the SizedWaitGroup counter is zero.
//See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Wait() {
    s.wg.Wait()
}

package main
import (
    "fmt"
    "math/rand"
    "time"
    "github.com/remeh/sizedwaitgroup"
)
func main() {
    rand.Seed(time.Now().UnixNano())
    // Typical use-case:
    // 50 queries must be executed as quick as possible
    // but without overloading the database, so only
    // 8 routines should be started concurrently.
    swg := sizedwaitgroup.New(8)
    for i := 0; i < 50; i++ {
        swg.Add()
        go func(i int) {
            defer swg.Done()
            query(i)
        }(i)
    }
    swg.Wait()
}
func query(i int) {
    fmt.Println(i)
    ms := i + 500 + rand.Intn(500)
    time.Sleep(time.Duration(ms) * time.Millisecond)
}