Last active
October 29, 2020 21:11
-
-
Save gammazero/bb543631f25a73d1a3f6c8e1a6970eb8 to your computer and use it in GitHub Desktop.
Track workerpool job completion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"time" | |
"github.com/gammazero/workerpool" | |
) | |
// myFunction is an example of a function that you run for various input. | |
func myFunction(name string) (string, error) { | |
if name == "Fedor" { | |
// Fedor is not allowed | |
return "", errors.New("not authorized") | |
} | |
if name == "Lyudmilla" { | |
// Lyudmilla times out | |
time.Sleep(25 * time.Second) | |
} | |
data := name + " is good" | |
return data, nil | |
} | |
func main() { | |
runner := NewRunner(20) | |
idNameMap := map[int]string{ | |
101: "Anna", | |
212: "Artyom", | |
323: "Fedor", | |
434: "Katya", | |
545: "Lyudmilla", | |
} | |
// Run myFunction using the runner. | |
for id, name := range idNameMap { | |
name := name | |
runner.Run(id, func() (interface{}, error) { | |
return myFunction(name) | |
}) | |
} | |
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | |
defer cancel() | |
// Wait until there are results from all jobs, or until timeout. | |
err := runner.Wait(ctx) | |
if err != nil { | |
fmt.Println("did not get all results:", err) | |
} | |
// Get results by ID. | |
for id, _ := range idNameMap { | |
data, err := runner.GetResult(id) | |
if err != nil { | |
fmt.Println(id, "error:", err) | |
} else { | |
fmt.Println(id, "data:", data.(string)) | |
} | |
} | |
// Get a result that does not exist | |
id := 999 | |
if _, err = runner.GetResult(id); err != nil { | |
fmt.Println(id, "error:", err) | |
} | |
} | |
// ----- Everything below should be in a separate "runner" package ----- | |
var ( | |
ErrAlreadyRunning = errors.New("job with same id is already running") | |
ErrNoSuchJob = errors.New("no such job") | |
) | |
type Runner struct { | |
resultChan chan result | |
results map[int]result | |
waitErr error | |
waitSet map[int]struct{} | |
wp *workerpool.WorkerPool | |
} | |
type result struct { | |
id int | |
data interface{} | |
err error | |
} | |
// NewRunner creates a new job runner. | |
func NewRunner(workers int) *Runner { | |
return &Runner{ | |
resultChan: make(chan result), | |
waitSet: map[int]struct{}{}, | |
wp: workerpool.New(workers), | |
} | |
} | |
// Run executes a function that returns data and error | |
func (r *Runner) Run(id int, f func() (interface{}, error)) error { | |
if _, found := r.waitSet[id]; found { | |
return ErrAlreadyRunning | |
} | |
r.waitSet[id] = struct{}{} | |
r.wp.Submit(func() { | |
data, err := f() | |
r.resultChan <- result{id, data, err} | |
}) | |
return nil | |
} | |
// Wait collects job result until the context is canceled or times out. | |
func (r *Runner) Wait(ctx context.Context) error { | |
r.results = make(map[int]result, len(r.waitSet)) | |
for len(r.waitSet) > 0 { | |
select { | |
case res := <-r.resultChan: | |
if _, ok := r.waitSet[res.id]; !ok { | |
fmt.Println("Unexpected response ID:", res.id) | |
continue | |
} | |
delete(r.waitSet, res.id) | |
r.results[res.id] = res | |
case <-ctx.Done(): | |
// Context canceled or timed out | |
r.waitErr = ctx.Err() | |
return fmt.Errorf("%s while waiting for %d results", ctx.Err(), | |
len(r.waitSet)) | |
} | |
} | |
return nil | |
} | |
// GetResult returns the result of running the job that had the specified id | |
func (r *Runner) GetResult(id int) (interface{}, error) { | |
result, ok := r.results[id] | |
if !ok { | |
if _, ok = r.waitSet[id]; !ok { | |
return nil, ErrNoSuchJob | |
} | |
return nil, r.waitErr | |
} | |
return result.data, result.err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment