Skip to content

Instantly share code, notes, and snippets.

@ianfoo
Created October 12, 2018 03:16
Show Gist options
  • Select an option

  • Save ianfoo/a7778c2346f92d742d192e60ef81f01c to your computer and use it in GitHub Desktop.

Select an option

Save ianfoo/a7778c2346f92d742d192e60ef81f01c to your computer and use it in GitHub Desktop.
Bounded parallel execution example
package main
import (
"bufio"
"context"
"encoding/json"
"flag"
"io"
"log"
"os"
"runtime"
"time"
)
// General Strategy
//
// Setup:
// * Set up a pool of workers in a buffered channel
//
// Work dispatcher:
// * Pull entry off stack
// * Claim a goroutine from the "worker queue"
// * Send work to worker
//
// Perform work:
// * check context for cancellation first
// * execute job
// * send results on channel
//
// * explicit cancellation?
// * send error on channel and have that call the cancellation func
func main() {
var (
workerPoolSize = flag.Int("pool", runtime.NumCPU(), "Number of workers to run")
timeout = flag.Duration("timeout", 10*time.Second, "Maximum number of time to run")
debug = os.Getenv("DEBUG") != ""
logger = logger{debug: debug, Logger: log.New(os.Stderr, "", 0)}
)
flag.Parse()
if *workerPoolSize < 1 {
logger.Fatal("error: worker pool size must be greater than zero")
}
logger.SetFlags(0)
logger.Printf("Reading input with %d workers", *workerPoolSize)
// Top-level context.
ctx, cancel := context.WithTimeout(context.Background(), *timeout)
defer cancel()
results, err := readInput(ctx, logger, os.Stdin, *workerPoolSize)
if err != nil {
logger.Fatalf("error: reading input: %v", err)
}
enc := json.NewEncoder(os.Stdout)
err = enc.Encode(results)
if err != nil {
logger.Fatalf("error: writing output: %v", err)
}
}
type (
// Job describes the input to the worker.
Job struct {
ctx context.Context
lineNum int
line string
}
// LineDetails is the output of a job.
LineDetails struct {
LineNumber int
LineLength int
Err error
}
// worker processes the jobs.
worker struct {
id int
logger logger
// Worker contains a reference to the worker pool so it can put itself
// back in after a job has been finished.
pool chan<- worker
// Jobs arrive on the jobs channel.
jobs chan Job
// Results of the jobs are written to output.
output chan<- LineDetails
// Shutdown is written to signal that the worker has shut down.
shutdown chan<- struct{}
}
logger struct {
debug bool
*log.Logger
}
)
func (l logger) Printf(format string, args ...interface{}) {
if l.debug {
l.Logger.Printf(format, args...)
}
}
func (l logger) Println(args ...interface{}) {
if l.debug {
l.Logger.Println(args...)
}
}
func (l logger) Print(args ...interface{}) {
if l.debug {
l.Logger.Print(args...)
}
}
// Start the worker.
func (w worker) Start() {
go func() {
defer func() {
w.log("exiting")
// Signal that worker has exited.
w.shutdown <- struct{}{}
}()
w.log("starting")
for job := range w.jobs {
w.performWork(job)
// Put worker back into the availability pool.
w.pool <- w
}
}()
// Worker is ready, so put it into the availability pool.
w.pool <- w
}
// Process sends work to the worker.
func (w worker) Process(ctx context.Context, lineNum int, line string) {
w.jobs <- Job{
ctx: ctx,
lineNum: lineNum,
line: line,
}
}
// Stop accepting work.
func (w worker) Stop() {
close(w.jobs)
}
// performWork takes a context so it can be canceled. This allows the context
// to be passed along to whatever's being called inside performWork, like, for
// instance, http.Request.WithContext.
func (w worker) performWork(j Job) {
// Check for cancellation before doing any work.
select {
default:
case <-j.ctx.Done():
w.log("context canceled: exiting work")
}
// More realistic example to allow for work to be canceled in-flight:
// req := http.NewRequest(...).WithContext(ctx)
// resp, err := http.Do(req)
w.output <- LineDetails{
LineNumber: j.lineNum,
LineLength: len(j.line),
}
w.log("processed line number %d", j.lineNum)
}
func (w worker) log(format string, args ...interface{}) {
w.logger.Printf("[worker %02d] "+format, append([]interface{}{w.id}, args...)...)
}
// Read the input from a reader, and report back the length of each line.
// The work is done by a pool of poolSize workers.
func readInput(ctx context.Context, logger logger, in io.Reader, poolSize int) ([]LineDetails, error) {
var (
workerPool = make(chan worker, poolSize)
workerShutdown = make(chan struct{}, poolSize)
collector = make(chan LineDetails)
)
defer close(workerPool)
// Create workers.
for i := 0; i < poolSize; i++ {
worker := worker{
id: i + 1,
logger: logger,
pool: workerPool,
jobs: make(chan Job),
output: collector,
shutdown: workerShutdown,
}
worker.Start()
}
// Distribute work to workers.
var (
reader = bufio.NewScanner(os.Stdin)
errCh = make(chan error)
intCtx, cancel = context.WithCancel(ctx)
)
defer cancel()
go func() {
// Define the behavior for when all work has been dispatched.
defer func() {
// Shut down workers.
logger.Print("shutting down workers")
// Wait for workers to acknowledge.
for i := 0; i < poolSize; i++ {
w := <-workerPool
w.Stop()
<-workerShutdown
}
// Close collector channel to allow collector to exit.
close(collector)
}()
// Read the input and send to workers.
i := 0
for reader.Scan() {
i++
w := <-workerPool
logger.Printf("got worker %d from pool", w.id)
w.Process(intCtx, i, reader.Text())
}
// In more realistic workloads, an error may occur anywhere,
// in which case the context above could be canceled.
if err := reader.Err(); err != nil {
errCh <- err
}
}()
// Collect results.
logger.Print("collecting results")
var results []LineDetails
for r := range collector {
select {
case <-ctx.Done():
logger.Printf("context is done: err: %v", ctx.Err())
return nil, ctx.Err()
case err := <-errCh:
logger.Printf("error reading input: err: %v", err)
return nil, err
default:
if r.Err != nil {
logger.Printf("error encountered in job processing: exiting early")
return nil, r.Err
}
results = append(results, r)
}
}
logger.Printf("collector channel is closed: exiting")
// All finished.
return results, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment