Skip to content

Instantly share code, notes, and snippets.

@ianfoo
Created October 9, 2018 03:31
Show Gist options
  • Select an option

  • Save ianfoo/434fbc5c5abe1405147931abc2d1a7f8 to your computer and use it in GitHub Desktop.

Select an option

Save ianfoo/434fbc5c5abe1405147931abc2d1a7f8 to your computer and use it in GitHub Desktop.
Run tasks in parallel with timeout
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
)
// TODO Review https://blog.golang.org/pipelines
// Not context related but still a great article.
func main() {
rand.Seed(time.Now().UnixNano())
timeout := rand.Intn(20)
timeoutDuration := time.Millisecond * time.Duration(timeout)
log.Printf("Timeout is set to %s", timeoutDuration)
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()
results, err := runParallel(ctx, 10, timeout, func() result {
return result(rand.Intn(1000))
})
fmt.Printf("results: %v; err: %v\n", results, err)
}
type result int
// Run n parallel functions.
//
// TODO Generalize result type?
// TODO Ability for parallelized tasks to return errors
// NOTE Ensure no leaking on unexpected exit
func runParallel(ctx context.Context, n, timeout int, f func() result) ([]result, error) {
// Dispatch work in parallel.
collector := make(chan result)
defer close(collector)
for i := 0; i < n; i++ {
go func(i int) {
g := func() result {
// NOTE Just for demonstration, injecting random sleep.
// It's inside runParallel because from there we can log with
// the goroutine number. Otherwise it'd be defined in the
// function defined in the runParallel call in main.
sleep := rand.Intn(timeout) + timeout/3
sleepDuration := time.Millisecond * time.Duration(sleep)
log.Printf("[%02d] Sleeping for %s", i, sleepDuration)
time.Sleep(sleepDuration)
return f()
}
select {
case collector <- g():
log.Printf("[%02d] Sent value to channel", i)
// Regular operation
case <-ctx.Done():
log.Printf("[%02d] Execution interrupted by context", i)
}
}(i)
}
// Collect results, possibly exiting early.
results := make([]result, n)
for i := 0; i < n; i++ {
select {
case results[i] = <-collector:
log.Printf("Received value %d on collector channel", results[i])
case <-ctx.Done():
log.Print("Exiting because of context completion")
return nil, ctx.Err()
}
}
return results, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment