Created
June 1, 2018 14:19
-
-
Save arpchaudhary/8854747635741611b0808b47ea6d84f4 to your computer and use it in GitHub Desktop.
Simple Worker Co-ordination with Ranging over channel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
) | |
const ( | |
DefaultChannelSize = 100 | |
) | |
type Job interface { | |
Execute() error | |
Name() string | |
} | |
func newJob(name string, delay time.Duration) Job { | |
return &job{name, delay} | |
} | |
// Job holds the attributes needed to perform unit of work. | |
type job struct { | |
name string | |
delay time.Duration | |
} | |
func (j *job) Execute() error { | |
time.Sleep(j.delay) | |
return nil | |
} | |
func (j *job) Name() string { | |
return j.name | |
} | |
// NewWorker creates takes a numeric id and a channel w/ worker pool. | |
func NewWorker(id int, workerPool chan chan Job, wg *sync.WaitGroup) *Worker { | |
return &Worker{ | |
id: id, | |
jobQueue: make(chan Job), | |
workerPool: workerPool, | |
quitChan: make(chan bool), | |
wg: wg, | |
} | |
} | |
type Worker struct { | |
id int | |
jobQueue chan Job | |
workerPool chan chan Job | |
quitChan chan bool | |
wg *sync.WaitGroup | |
} | |
func (w *Worker) start() { | |
go func() { | |
defer func() { | |
w.wg.Done() | |
fmt.Printf("Worker[%d] has finished\n", w.id) | |
// recover from panic if one occured. Set err to nil otherwise. | |
if (recover() != nil) { | |
fmt.Printf("Worker[%d] has hit panic. Recovered\n", w.id) | |
//silent death | |
} | |
}() | |
fmt.Printf("Worker[%d] is alive!\n", w.id) | |
//Mark attendance in the worker pool | |
w.workerPool <- w.jobQueue | |
for job := range w.jobQueue { | |
// Add my jobQueue to the worker pool. | |
job.Execute() | |
fmt.Printf("Worker[%d] completed %s\n", w.id, job.Name()) | |
//Add the worker back to the pool | |
//This can panic if the worker pool is already closed | |
//Shutting down state | |
w.workerPool <- w.jobQueue | |
} | |
//The channel has closed. Time to shutdown. Stackunwind will decrement the counter | |
}() | |
} | |
// NewDispatcher creates, and returns a new Dispatcher object. | |
func NewDispatcher(name string, maxWorkers int) *Dispatcher { | |
workerPool := make(chan chan Job, maxWorkers) | |
jobQueue := make(chan Job, DefaultChannelSize) | |
return &Dispatcher{ | |
name: name, | |
jobQueue: jobQueue, | |
maxWorkers: maxWorkers, | |
workerPool: workerPool, | |
workerWg: &sync.WaitGroup{}, | |
} | |
} | |
type Dispatcher struct { | |
name string | |
workerPool chan chan Job | |
maxWorkers int | |
jobQueue chan Job | |
workerWg *sync.WaitGroup | |
} | |
func (d *Dispatcher) Run() { | |
for i := 0; i < d.maxWorkers; i++ { | |
id := i + 1 | |
d.workerWg.Add(1) | |
worker := NewWorker(id, d.workerPool, d.workerWg) | |
worker.start() | |
} | |
go d.dispatch() | |
} | |
func (d *Dispatcher) dispatch() { | |
for job := range d.jobQueue{ | |
//fmt.Printf("fetching workerJobQueue for: %s\n", job.Name()) | |
workerJobQueue := <-d.workerPool | |
fmt.Printf("Adding %s to workerJobQueue\n", job.Name()) | |
workerJobQueue <- job | |
} | |
fmt.Printf("Dispatcher jobQueue has ended.\n") | |
//At this point all jobs have been added on to the workerpool | |
//So it is safe to close the pool. | |
//When the workers will add them to the pool, it will cause a panic | |
//Causing them to shutdown | |
/*w := 0 | |
for w < d.maxWorkers { | |
close(<-d.workerPool) | |
w++ | |
}*/ | |
close(d.workerPool) | |
//Need to shut down the workerPool | |
fmt.Printf("Dispatcher is waiting for workers to finish queues\n") | |
//We have closed the input for the workers. Waiting for them to shutdown now | |
} | |
func (d *Dispatcher) AddJob(job Job) { | |
d.jobQueue <- job | |
} | |
func (d *Dispatcher) SafeClose() { | |
// No more Adding jobs to the jobqueue function | |
close(d.jobQueue) | |
fmt.Println("Dispatcher jobQueue closed. Waiting for dispatcher run to exit\n") | |
d.workerWg.Wait() | |
//close(d.workerPool) | |
} | |
func main() { | |
delay := 10 * time.Millisecond | |
jobs := []Job{ | |
newJob("1", delay), | |
newJob("2", delay), | |
newJob("3", delay), | |
newJob("4", delay), | |
newJob("5", delay), | |
newJob("6", delay), | |
newJob("7", delay), | |
newJob("8", delay), | |
newJob("9", delay), | |
newJob("10", delay), | |
} | |
d := NewDispatcher("dp_1", 5) | |
d.Run() | |
for _, j := range jobs { | |
d.AddJob(j) | |
} | |
fmt.Println("Added all jobs") | |
// time.Sleep(1 * time.Second) | |
d.SafeClose() | |
fmt.Println("All work is done. World peace achieved\n") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment