Skip to content

Instantly share code, notes, and snippets.

@arpchaudhary
Created June 1, 2018 14:19
Show Gist options
  • Save arpchaudhary/8854747635741611b0808b47ea6d84f4 to your computer and use it in GitHub Desktop.
Save arpchaudhary/8854747635741611b0808b47ea6d84f4 to your computer and use it in GitHub Desktop.
Simple Worker Co-ordination with Ranging over channel
package main
import (
"fmt"
"sync"
"time"
)
const (
DefaultChannelSize = 100
)
type Job interface {
Execute() error
Name() string
}
func newJob(name string, delay time.Duration) Job {
return &job{name, delay}
}
// Job holds the attributes needed to perform unit of work.
type job struct {
name string
delay time.Duration
}
func (j *job) Execute() error {
time.Sleep(j.delay)
return nil
}
func (j *job) Name() string {
return j.name
}
// NewWorker creates takes a numeric id and a channel w/ worker pool.
func NewWorker(id int, workerPool chan chan Job, wg *sync.WaitGroup) *Worker {
return &Worker{
id: id,
jobQueue: make(chan Job),
workerPool: workerPool,
quitChan: make(chan bool),
wg: wg,
}
}
type Worker struct {
id int
jobQueue chan Job
workerPool chan chan Job
quitChan chan bool
wg *sync.WaitGroup
}
func (w *Worker) start() {
go func() {
defer func() {
w.wg.Done()
fmt.Printf("Worker[%d] has finished\n", w.id)
// recover from panic if one occured. Set err to nil otherwise.
if (recover() != nil) {
fmt.Printf("Worker[%d] has hit panic. Recovered\n", w.id)
//silent death
}
}()
fmt.Printf("Worker[%d] is alive!\n", w.id)
//Mark attendance in the worker pool
w.workerPool <- w.jobQueue
for job := range w.jobQueue {
// Add my jobQueue to the worker pool.
job.Execute()
fmt.Printf("Worker[%d] completed %s\n", w.id, job.Name())
//Add the worker back to the pool
//This can panic if the worker pool is already closed
//Shutting down state
w.workerPool <- w.jobQueue
}
//The channel has closed. Time to shutdown. Stackunwind will decrement the counter
}()
}
// NewDispatcher creates, and returns a new Dispatcher object.
func NewDispatcher(name string, maxWorkers int) *Dispatcher {
workerPool := make(chan chan Job, maxWorkers)
jobQueue := make(chan Job, DefaultChannelSize)
return &Dispatcher{
name: name,
jobQueue: jobQueue,
maxWorkers: maxWorkers,
workerPool: workerPool,
workerWg: &sync.WaitGroup{},
}
}
type Dispatcher struct {
name string
workerPool chan chan Job
maxWorkers int
jobQueue chan Job
workerWg *sync.WaitGroup
}
func (d *Dispatcher) Run() {
for i := 0; i < d.maxWorkers; i++ {
id := i + 1
d.workerWg.Add(1)
worker := NewWorker(id, d.workerPool, d.workerWg)
worker.start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for job := range d.jobQueue{
//fmt.Printf("fetching workerJobQueue for: %s\n", job.Name())
workerJobQueue := <-d.workerPool
fmt.Printf("Adding %s to workerJobQueue\n", job.Name())
workerJobQueue <- job
}
fmt.Printf("Dispatcher jobQueue has ended.\n")
//At this point all jobs have been added on to the workerpool
//So it is safe to close the pool.
//When the workers will add them to the pool, it will cause a panic
//Causing them to shutdown
/*w := 0
for w < d.maxWorkers {
close(<-d.workerPool)
w++
}*/
close(d.workerPool)
//Need to shut down the workerPool
fmt.Printf("Dispatcher is waiting for workers to finish queues\n")
//We have closed the input for the workers. Waiting for them to shutdown now
}
func (d *Dispatcher) AddJob(job Job) {
d.jobQueue <- job
}
func (d *Dispatcher) SafeClose() {
// No more Adding jobs to the jobqueue function
close(d.jobQueue)
fmt.Println("Dispatcher jobQueue closed. Waiting for dispatcher run to exit\n")
d.workerWg.Wait()
//close(d.workerPool)
}
func main() {
delay := 10 * time.Millisecond
jobs := []Job{
newJob("1", delay),
newJob("2", delay),
newJob("3", delay),
newJob("4", delay),
newJob("5", delay),
newJob("6", delay),
newJob("7", delay),
newJob("8", delay),
newJob("9", delay),
newJob("10", delay),
}
d := NewDispatcher("dp_1", 5)
d.Run()
for _, j := range jobs {
d.AddJob(j)
}
fmt.Println("Added all jobs")
// time.Sleep(1 * time.Second)
d.SafeClose()
fmt.Println("All work is done. World peace achieved\n")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment