Last active
October 22, 2018 14:13
-
-
Save hawell/ee7a1e7ae22c0e35e43b8bc60cc71e53 to your computer and use it in GitHub Desktop.
Golang WorkerPool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
type Job interface { | |
Handle() | |
} | |
type Dispatcher struct { | |
WorkerPool chan chan Job | |
WorkerList []*Worker | |
MaxWorkers int | |
MaxWaitingJobs int | |
JobQueue chan Job | |
quit chan bool | |
} | |
func NewDispatcher(maxWorkers int, maxWaitingJobs int) *Dispatcher { | |
d := &Dispatcher { | |
WorkerPool: make(chan chan Job, maxWorkers), | |
MaxWorkers: maxWorkers, | |
JobQueue: make(chan Job, maxWaitingJobs), | |
quit: make(chan bool), | |
} | |
for i := 0; i< maxWorkers; i++ { | |
w := NewWorker(d.WorkerPool, i) | |
d.WorkerList = append(d.WorkerList, w) | |
} | |
return d | |
} | |
func (d *Dispatcher) Run() { | |
for _, w := range d.WorkerList { | |
w.Run() | |
} | |
go d.Dispatch() | |
} | |
func (d *Dispatcher) Dispatch() { | |
for { | |
select { | |
case job := <- d.JobQueue: | |
go func(job Job) { | |
jobChannel := <- d.WorkerPool | |
jobChannel <- job | |
}(job) | |
case <- d.quit: | |
for _, w := range d.WorkerList { | |
w.Stop() | |
} | |
return | |
} | |
} | |
} | |
func (d *Dispatcher) Queue(job Job) { | |
go func() { d.JobQueue <- job }() | |
} | |
func (d *Dispatcher) Stop() { | |
d.quit <- true | |
} | |
type Worker struct { | |
Id int | |
WorkerPool chan chan Job | |
JobChannel chan Job | |
quit chan bool | |
} | |
func NewWorker(workerPool chan chan Job, id int) *Worker { | |
return &Worker { | |
Id: id, | |
WorkerPool: workerPool, | |
JobChannel: make(chan Job), | |
quit: make(chan bool), | |
} | |
} | |
func (w *Worker) Run() { | |
go func() { | |
for { | |
w.WorkerPool <- w.JobChannel | |
// fmt.Println("worker ", w.Id, " waiting for job") | |
select { | |
case job := <- w.JobChannel: | |
// fmt.Println("worker ", w.Id, " got a job") | |
job.Handle() | |
// fmt.Println("worker ", w.Id, " job done") | |
case <- w.quit: | |
fmt.Println("worker ", w.Id, " quit") | |
return | |
} | |
} | |
}() | |
} | |
func (w *Worker) Stop() { | |
go func() { | |
w.quit <- true | |
}() | |
} | |
type AJob string | |
func (aJob AJob) Handle() { | |
fmt.Println(aJob) | |
} | |
func main() { | |
d := NewDispatcher(10, 100) | |
d.Run() | |
for i := 0; i < 100; i++ { | |
job := AJob(fmt.Sprintf("job %d", i)) | |
d.Queue(job) | |
} | |
time.Sleep(time.Second * 5) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment