Skip to content

Instantly share code, notes, and snippets.

@harlow
Last active October 30, 2024 13:27
Show Gist options
  • Save harlow/dbcd639cf8d396a2ab73 to your computer and use it in GitHub Desktop.
Save harlow/dbcd639cf8d396a2ab73 to your computer and use it in GitHub Desktop.
Job queues in Golang

Golang Workers / Job Queue

A running example of the code from:

This gist creates a working example from blog post, and a alternate example using simple worker pool.

TLDR: if you want simple and controlled concurrency use a worker pool.

Step 1

Small refactorings made to original code:

Step 2

Simplify the worker queue by removing the Dispatcher.

  • Creates workers directly and passes job queue to them

https://gist.github.com/harlow/dbcd639cf8d396a2ab73#file-worker_refactored-go

Run the Application

Boot either the worker_original.go or the worker_refactored.go applications. Use flags to adjust the max_workers and max_queue_size to override the default values.

$ go run worker_original.go -max_workers 5

cURL the application from another terminal window:

$ for i in {1..15}; do curl localhost:8080/work -d name=job$i -d delay=$(expr $i % 9 + 1)s; done

Performance

The test run with Pprof show performance characteristics remain the same between both examples.

// Original code with Dispatcher
package main
import (
_ "expvar"
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"time"
)
// Job holds the attributes needed to perform unit of work.
type Job struct {
Name string
Delay time.Duration
}
// NewWorker creates takes a numeric id and a channel w/ worker pool.
func NewWorker(id int, workerPool chan chan Job) Worker {
return Worker{
id: id,
jobQueue: make(chan Job),
workerPool: workerPool,
quitChan: make(chan bool),
}
}
type Worker struct {
id int
jobQueue chan Job
workerPool chan chan Job
quitChan chan bool
}
func (w Worker) start() {
go func() {
for {
// Add my jobQueue to the worker pool.
w.workerPool <- w.jobQueue
select {
case job := <-w.jobQueue:
// Dispatcher has added a job to my jobQueue.
fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds())
time.Sleep(job.Delay)
fmt.Printf("worker%d: completed %s!\n", w.id, job.Name)
case <-w.quitChan:
// We have been asked to stop.
fmt.Printf("worker%d stopping\n", w.id)
return
}
}
}()
}
func (w Worker) stop() {
go func() {
w.quitChan <- true
}()
}
// NewDispatcher creates, and returns a new Dispatcher object.
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {
workerPool := make(chan chan Job, maxWorkers)
return &Dispatcher{
jobQueue: jobQueue,
maxWorkers: maxWorkers,
workerPool: workerPool,
}
}
type Dispatcher struct {
workerPool chan chan Job
maxWorkers int
jobQueue chan Job
}
func (d *Dispatcher) run() {
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(i+1, d.workerPool)
worker.start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-d.jobQueue:
go func() {
fmt.Printf("fetching workerJobQueue for: %s\n", job.Name)
workerJobQueue := <-d.workerPool
fmt.Printf("adding %s to workerJobQueue\n", job.Name)
workerJobQueue <- job
}()
}
}
}
func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Parse the delay.
delay, err := time.ParseDuration(r.FormValue("delay"))
if err != nil {
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
return
}
// Validate delay is in range 1 to 10 seconds.
if delay.Seconds() < 1 || delay.Seconds() > 10 {
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
return
}
// Set name and validate value.
name := r.FormValue("name")
if name == "" {
http.Error(w, "You must specify a name.", http.StatusBadRequest)
return
}
// Create Job and push the work onto the jobQueue.
job := Job{Name: name, Delay: delay}
jobQueue <- job
// Render success.
w.WriteHeader(http.StatusCreated)
}
func main() {
var (
maxWorkers = flag.Int("max_workers", 5, "The number of workers to start")
maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue")
port = flag.String("port", "8080", "The server port")
)
flag.Parse()
// Create the job queue.
jobQueue := make(chan Job, *maxQueueSize)
// Start the dispatcher.
dispatcher := NewDispatcher(jobQueue, *maxWorkers)
dispatcher.run()
// Start the HTTP handler.
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
requestHandler(w, r, jobQueue)
})
log.Fatal(http.ListenAndServe(":"+*port, nil))
}
package main
import (
_ "expvar"
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"time"
)
type job struct {
name string
duration time.Duration
}
func doWork(id int, j job) {
fmt.Printf("worker%d: started %s, working for %f seconds\n", id, j.name, j.duration.Seconds())
time.Sleep(j.duration)
fmt.Printf("worker%d: completed %s!\n", w.id, j.name)
}
func requestHandler(jobs chan job, w http.ResponseWriter, r *http.Request) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Parse the durations.
duration, err := time.ParseDuration(r.FormValue("delay"))
if err != nil {
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
return
}
// Validate delay is in range 1 to 10 seconds.
if duration.Seconds() < 1 || duration.Seconds() > 10 {
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
return
}
// Set name and validate value.
name := r.FormValue("name")
if name == "" {
http.Error(w, "You must specify a name.", http.StatusBadRequest)
return
}
// Create Job and push the work onto the jobCh.
job := job{name, duration}
go func() {
fmt.Printf("added: %s %s\n", job.name, job.duration)
jobs <- job
}()
// Render success.
w.WriteHeader(http.StatusCreated)
return
}
func main() {
var (
maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue")
maxWorkers = flag.Int("max_workers", 5, "The number of workers to start")
port = flag.String("port", "8080", "The server port")
)
flag.Parse()
// create job channel
jobs := make(chan job, *maxQueueSize)
// create workers
for i := 1; i <= *maxWorkers; i++ {
go func(i int) {
for j := range jobs {
doWork(i, j)
}
}(i)
}
// handler for adding jobs
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
requestHandler(jobs, w, r)
})
log.Fatal(http.ListenAndServe(":"+*port, nil))
}
package main
import (
_ "expvar"
"fmt"
"math/rand"
"sync"
"time"
)
const maxWorkers = 10
type job struct {
name string
duration time.Duration
}
func doWork(id int, j job) {
fmt.Printf("worker%d: started %s, working for %fs\n", id, j.name, j.duration.Seconds())
time.Sleep(j.duration)
fmt.Printf("worker%d: completed %s!\n", id, j.name)
}
func main() {
// channel for jobs
jobs := make(chan job)
// start workers
wg := &sync.WaitGroup{}
wg.Add(maxWorkers)
for i := 1; i <= maxWorkers; i++ {
go func(i int) {
defer wg.Done()
for j := range jobs {
doWork(i, j)
}
}(i)
}
// add jobs
for i := 0; i < 100; i++ {
name := fmt.Sprintf("job-%d", i)
duration := time.Duration(rand.Intn(1000)) * time.Millisecond
fmt.Printf("adding: %s %s\n", name, duration)
jobs <- job{name, duration}
}
close(jobs)
// wait for workers to complete
wg.Wait()
}
@mocanuga
Copy link

mocanuga commented May 2, 2017

Hi,

Thanks for this example code.
I would like to point out an error in the refactored code, at line 21.

There is no variable w. So w.id should be just id.

@feliperohdee
Copy link

Hi @harlow

whats the difference in use goroutine or not at this line https://gist.github.com/harlow/dbcd639cf8d396a2ab73#file-worker_refactored-go-L54?

@vardius
Copy link

vardius commented Jul 25, 2017

@feliperohdee

I quote the answer from @zerkms

if the jobs channel is not buffered or is full - then if you don't have a goroutine there it would block the whole method
with goroutine though the method returns and sends an http response to the client

@feliperohdee
Copy link

Thank u @vardius

@Serhioromano
Copy link

👍

@zonorion
Copy link

func (w Worker) stop() { go func() { w.quitChan <- true }() }

Why worker stop func does not call?

@bimbimprasetyoafif
Copy link

what happen when handler need to know and return job's result if it success or not?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment