Created
October 12, 2018 03:16
-
-
Save ianfoo/a7778c2346f92d742d192e60ef81f01c to your computer and use it in GitHub Desktop.
Bounded parallel execution example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "bufio" | |
| "context" | |
| "encoding/json" | |
| "flag" | |
| "io" | |
| "log" | |
| "os" | |
| "runtime" | |
| "time" | |
| ) | |
| // General Strategy | |
| // | |
| // Setup: | |
| // * Set up a pool of workers in a buffered channel | |
| // | |
| // Work dispatcher: | |
| // * Pull entry off stack | |
| // * Claim a goroutine from the "worker queue" | |
| // * Send work to worker | |
| // | |
| // Perform work: | |
| // * check context for cancellation first | |
| // * execute job | |
| // * send results on channel | |
| // | |
| // * explicit cancellation? | |
| // * send error on channel and have that call the cancellation func | |
| func main() { | |
| var ( | |
| workerPoolSize = flag.Int("pool", runtime.NumCPU(), "Number of workers to run") | |
| timeout = flag.Duration("timeout", 10*time.Second, "Maximum number of time to run") | |
| debug = os.Getenv("DEBUG") != "" | |
| logger = logger{debug: debug, Logger: log.New(os.Stderr, "", 0)} | |
| ) | |
| flag.Parse() | |
| if *workerPoolSize < 1 { | |
| logger.Fatal("error: worker pool size must be greater than zero") | |
| } | |
| logger.SetFlags(0) | |
| logger.Printf("Reading input with %d workers", *workerPoolSize) | |
| // Top-level context. | |
| ctx, cancel := context.WithTimeout(context.Background(), *timeout) | |
| defer cancel() | |
| results, err := readInput(ctx, logger, os.Stdin, *workerPoolSize) | |
| if err != nil { | |
| logger.Fatalf("error: reading input: %v", err) | |
| } | |
| enc := json.NewEncoder(os.Stdout) | |
| err = enc.Encode(results) | |
| if err != nil { | |
| logger.Fatalf("error: writing output: %v", err) | |
| } | |
| } | |
| type ( | |
| // Job describes the input to the worker. | |
| Job struct { | |
| ctx context.Context | |
| lineNum int | |
| line string | |
| } | |
| // LineDetails is the output of a job. | |
| LineDetails struct { | |
| LineNumber int | |
| LineLength int | |
| Err error | |
| } | |
| // worker processes the jobs. | |
| worker struct { | |
| id int | |
| logger logger | |
| // Worker contains a reference to the worker pool so it can put itself | |
| // back in after a job has been finished. | |
| pool chan<- worker | |
| // Jobs arrive on the jobs channel. | |
| jobs chan Job | |
| // Results of the jobs are written to output. | |
| output chan<- LineDetails | |
| // Shutdown is written to signal that the worker has shut down. | |
| shutdown chan<- struct{} | |
| } | |
| logger struct { | |
| debug bool | |
| *log.Logger | |
| } | |
| ) | |
| func (l logger) Printf(format string, args ...interface{}) { | |
| if l.debug { | |
| l.Logger.Printf(format, args...) | |
| } | |
| } | |
| func (l logger) Println(args ...interface{}) { | |
| if l.debug { | |
| l.Logger.Println(args...) | |
| } | |
| } | |
| func (l logger) Print(args ...interface{}) { | |
| if l.debug { | |
| l.Logger.Print(args...) | |
| } | |
| } | |
| // Start the worker. | |
| func (w worker) Start() { | |
| go func() { | |
| defer func() { | |
| w.log("exiting") | |
| // Signal that worker has exited. | |
| w.shutdown <- struct{}{} | |
| }() | |
| w.log("starting") | |
| for job := range w.jobs { | |
| w.performWork(job) | |
| // Put worker back into the availability pool. | |
| w.pool <- w | |
| } | |
| }() | |
| // Worker is ready, so put it into the availability pool. | |
| w.pool <- w | |
| } | |
| // Process sends work to the worker. | |
| func (w worker) Process(ctx context.Context, lineNum int, line string) { | |
| w.jobs <- Job{ | |
| ctx: ctx, | |
| lineNum: lineNum, | |
| line: line, | |
| } | |
| } | |
| // Stop accepting work. | |
| func (w worker) Stop() { | |
| close(w.jobs) | |
| } | |
| // performWork takes a context so it can be canceled. This allows the context | |
| // to be passed along to whatever's being called inside performWork, like, for | |
| // instance, http.Request.WithContext. | |
| func (w worker) performWork(j Job) { | |
| // Check for cancellation before doing any work. | |
| select { | |
| default: | |
| case <-j.ctx.Done(): | |
| w.log("context canceled: exiting work") | |
| } | |
| // More realistic example to allow for work to be canceled in-flight: | |
| // req := http.NewRequest(...).WithContext(ctx) | |
| // resp, err := http.Do(req) | |
| w.output <- LineDetails{ | |
| LineNumber: j.lineNum, | |
| LineLength: len(j.line), | |
| } | |
| w.log("processed line number %d", j.lineNum) | |
| } | |
| func (w worker) log(format string, args ...interface{}) { | |
| w.logger.Printf("[worker %02d] "+format, append([]interface{}{w.id}, args...)...) | |
| } | |
| // Read the input from a reader, and report back the length of each line. | |
| // The work is done by a pool of poolSize workers. | |
| func readInput(ctx context.Context, logger logger, in io.Reader, poolSize int) ([]LineDetails, error) { | |
| var ( | |
| workerPool = make(chan worker, poolSize) | |
| workerShutdown = make(chan struct{}, poolSize) | |
| collector = make(chan LineDetails) | |
| ) | |
| defer close(workerPool) | |
| // Create workers. | |
| for i := 0; i < poolSize; i++ { | |
| worker := worker{ | |
| id: i + 1, | |
| logger: logger, | |
| pool: workerPool, | |
| jobs: make(chan Job), | |
| output: collector, | |
| shutdown: workerShutdown, | |
| } | |
| worker.Start() | |
| } | |
| // Distribute work to workers. | |
| var ( | |
| reader = bufio.NewScanner(os.Stdin) | |
| errCh = make(chan error) | |
| intCtx, cancel = context.WithCancel(ctx) | |
| ) | |
| defer cancel() | |
| go func() { | |
| // Define the behavior for when all work has been dispatched. | |
| defer func() { | |
| // Shut down workers. | |
| logger.Print("shutting down workers") | |
| // Wait for workers to acknowledge. | |
| for i := 0; i < poolSize; i++ { | |
| w := <-workerPool | |
| w.Stop() | |
| <-workerShutdown | |
| } | |
| // Close collector channel to allow collector to exit. | |
| close(collector) | |
| }() | |
| // Read the input and send to workers. | |
| i := 0 | |
| for reader.Scan() { | |
| i++ | |
| w := <-workerPool | |
| logger.Printf("got worker %d from pool", w.id) | |
| w.Process(intCtx, i, reader.Text()) | |
| } | |
| // In more realistic workloads, an error may occur anywhere, | |
| // in which case the context above could be canceled. | |
| if err := reader.Err(); err != nil { | |
| errCh <- err | |
| } | |
| }() | |
| // Collect results. | |
| logger.Print("collecting results") | |
| var results []LineDetails | |
| for r := range collector { | |
| select { | |
| case <-ctx.Done(): | |
| logger.Printf("context is done: err: %v", ctx.Err()) | |
| return nil, ctx.Err() | |
| case err := <-errCh: | |
| logger.Printf("error reading input: err: %v", err) | |
| return nil, err | |
| default: | |
| if r.Err != nil { | |
| logger.Printf("error encountered in job processing: exiting early") | |
| return nil, r.Err | |
| } | |
| results = append(results, r) | |
| } | |
| } | |
| logger.Printf("collector channel is closed: exiting") | |
| // All finished. | |
| return results, nil | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment