Skip to content

Instantly share code, notes, and snippets.

@aybabtme
Forked from supershabam/conc_err_map.go
Last active August 29, 2015 14:16
Show Gist options
  • Save aybabtme/ab64512f43a4fea5a44a to your computer and use it in GitHub Desktop.
Save aybabtme/ab64512f43a4fea5a44a to your computer and use it in GitHub Desktop.
//go:generate pipeliner map(func(string) (string, error) concurrently as concErrMap into conc_err_map.go
func concErrMap(concurrency int, fn func(string) (string, error), in <-chan string) (<-chan string, <-chan error) {
out := make(chan string)
errc := make(chan error, concurrency)
done := make(chan struct{})
go func() {
defer close(out)
defer close(errc)
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for {
select {
case <-done:
return
case item, ok := <-in:
if !ok {
return // end of channel
}
t, err := fn(item)
if err != nil {
errc <- err
close(done)
return
}
out <- t
}
}
}()
}
wg.Wait()
}()
return out, errc
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment