Skip to content

Instantly share code, notes, and snippets.

@yanmhlv
Last active June 14, 2023 13:12
Show Gist options
  • Save yanmhlv/f603d846734af13fef2f7f2637545454 to your computer and use it in GitHub Desktop.
Save yanmhlv/f603d846734af13fef2f7f2637545454 to your computer and use it in GitHub Desktop.
concurrency helpers
package cond
type Cond struct {
lock sync.RWMutex
changed chan struct{}
}
func newCond() *Cond {
return &Cond{changed: make(chan struct{})}
}
func (c *Cond) broadcast() {
c.lock.Lock()
changed := c.changed
c.changed = make(chan struct{})
c.lock.Unlock()
close(changed)
}
func (c *Cond) wait() <-chan struct{} {
c.lock.RLock()
changed := c.changed
c.lock.RUnlock()
return changed
}
package group
import (
"context"
"sync"
"golang.org/x/sync/errgroup"
)
func RunGroup[T any](ctx context.Context, maxRuntimeGoroutines int, collection []T, process func(T) error) error {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(maxRuntimeGoroutines)
taskCh := make(chan T)
eg.Go(func() error {
defer close(taskCh)
for _, c := range collection {
select {
case <-ctx.Done():
return nil
case taskCh <- c:
}
}
return nil
})
for task := range taskCh {
task := task
eg.Go(func() error { return process(task) })
}
return eg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment