Created
October 9, 2018 03:31
-
-
Save ianfoo/434fbc5c5abe1405147931abc2d1a7f8 to your computer and use it in GitHub Desktop.
Run tasks in parallel with timeout
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "context" | |
| "fmt" | |
| "log" | |
| "math/rand" | |
| "time" | |
| ) | |
| // TODO Review https://blog.golang.org/pipelines | |
| // Not context related but still a great article. | |
| func main() { | |
| rand.Seed(time.Now().UnixNano()) | |
| timeout := rand.Intn(20) | |
| timeoutDuration := time.Millisecond * time.Duration(timeout) | |
| log.Printf("Timeout is set to %s", timeoutDuration) | |
| ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) | |
| defer cancel() | |
| results, err := runParallel(ctx, 10, timeout, func() result { | |
| return result(rand.Intn(1000)) | |
| }) | |
| fmt.Printf("results: %v; err: %v\n", results, err) | |
| } | |
| type result int | |
| // Run n parallel functions. | |
| // | |
| // TODO Generalize result type? | |
| // TODO Ability for parallelized tasks to return errors | |
| // NOTE Ensure no leaking on unexpected exit | |
| func runParallel(ctx context.Context, n, timeout int, f func() result) ([]result, error) { | |
| // Dispatch work in parallel. | |
| collector := make(chan result) | |
| defer close(collector) | |
| for i := 0; i < n; i++ { | |
| go func(i int) { | |
| g := func() result { | |
| // NOTE Just for demonstration, injecting random sleep. | |
| // It's inside runParallel because from there we can log with | |
| // the goroutine number. Otherwise it'd be defined in the | |
| // function defined in the runParallel call in main. | |
| sleep := rand.Intn(timeout) + timeout/3 | |
| sleepDuration := time.Millisecond * time.Duration(sleep) | |
| log.Printf("[%02d] Sleeping for %s", i, sleepDuration) | |
| time.Sleep(sleepDuration) | |
| return f() | |
| } | |
| select { | |
| case collector <- g(): | |
| log.Printf("[%02d] Sent value to channel", i) | |
| // Regular operation | |
| case <-ctx.Done(): | |
| log.Printf("[%02d] Execution interrupted by context", i) | |
| } | |
| }(i) | |
| } | |
| // Collect results, possibly exiting early. | |
| results := make([]result, n) | |
| for i := 0; i < n; i++ { | |
| select { | |
| case results[i] = <-collector: | |
| log.Printf("Received value %d on collector channel", results[i]) | |
| case <-ctx.Done(): | |
| log.Print("Exiting because of context completion") | |
| return nil, ctx.Err() | |
| } | |
| } | |
| return results, nil | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment