Skip to content

Instantly share code, notes, and snippets.

@yanmhlv
Last active July 7, 2016 15:57
Show Gist options
  • Save yanmhlv/5f0b8530d34e94befb8e to your computer and use it in GitHub Desktop.
Save yanmhlv/5f0b8530d34e94befb8e to your computer and use it in GitHub Desktop.
worker thread pool example
type Payload interface{}
// Job represents the job to be run
type Job struct {
Payload Payload `json:"payload"`
}
type WorkerHandler func(job Job)
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan struct{}
Handler WorkerHandler
}
func NewWorker(workerPool chan chan Job, handler WorkerHandler) *Worker {
return &Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan struct{}),
Handler: handler,
}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w *Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
w.Handler(job)
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w *Worker) Stop() {
// go func() {
// w.quit <- true
// }()
close(w.quit)
}
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
MaxWorkers int
WorkerHandler WorkerHandler
// A buffered channel that we can send work requests on.
JobQueue chan Job
workers []*Worker
}
func NewDispatcher(maxWorkers int, handler WorkerHandler, jobQueue chan Job) *Dispatcher {
return &Dispatcher{
MaxWorkers: maxWorkers,
WorkerPool: make(chan chan Job, maxWorkers),
WorkerHandler: handler,
JobQueue: jobQueue,
}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(d.WorkerPool, d.WorkerHandler)
worker.Start()
d.workers = append(d.workers, worker)
}
go d.dispatch()
log.Info("dispatcher", "status", "started")
}
func (d *Dispatcher) Stop() {
log.Info("dispatcher", "status", "stoped")
for _, worker := range d.workers {
worker.Stop()
}
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-d.JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
// use example
JobQueue := make(chan Job, 100000)
handler := func(job Job) {
time.Sleep(50 * time.Millisecond)
log.Info("worker", "job", job)
}
dispatcher := NewDispatcher(1, handler, JobQueue)
dispatcher.Run()
for i := 0; i < 100000; i++ {
JobQueue <- Job{i}
}
log.Info("queue", "status", "completed")
time.Sleep(5 * time.Second)
dispatcher.Stop()
time.Sleep(3 * time.Second)
log.Info("app", "status", "exit")
package main
import (
"net/http"
"time"
"github.com/labstack/echo"
"github.com/labstack/echo/engine/standard"
"github.com/mgutz/logxi/v1"
)
type Payload interface{}
// Job represents the job to be run
type Job struct {
Payload Payload `json:"payload"`
}
// A buffered channel that we can send work requests on.
// var JobQueue chan Job
type WorkerHandler func(job Job)
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan struct{}
Handler WorkerHandler
}
func NewWorker(workerPool chan chan Job, handler WorkerHandler) *Worker {
return &Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan struct{}),
Handler: handler,
}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w *Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
w.Handler(job)
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w *Worker) Stop() {
// go func() {
// w.quit <- true
// }()
close(w.quit)
}
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
MaxWorkers int
WorkerHandler WorkerHandler
JobQueue chan Job
workers []*Worker
}
func NewDispatcher(maxWorkers int, handler WorkerHandler, jobQueue chan Job) *Dispatcher {
return &Dispatcher{
MaxWorkers: maxWorkers,
WorkerPool: make(chan chan Job, maxWorkers),
WorkerHandler: handler,
JobQueue: jobQueue,
}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(d.WorkerPool, d.WorkerHandler)
worker.Start()
d.workers = append(d.workers, worker)
}
go d.dispatch()
log.Info("dispatcher", "status", "started")
}
func (d *Dispatcher) Stop() {
log.Info("dispatcher", "status", "stoped")
for _, worker := range d.workers {
worker.Stop()
}
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-d.JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
func main() {
dtNow := time.Now()
log.Info("start", "at", dtNow)
defer log.Info("exit", "duration", time.Since(dtNow))
JobQueue := make(chan Job, 10000)
dispatcher := NewDispatcher(10, func(job Job) {
log.Info("worker", "job", job)
}, JobQueue)
dispatcher.Run()
e := echo.New()
e.Get("/", func(c echo.Context) error {
JobQueue <- Job{c.Request().RemoteAddress()}
log.Info("receive request", "ip", c.Request().RemoteAddress(), "user agent", c.Request().UserAgent())
return c.NoContent(http.StatusOK)
})
e.Run(standard.New(":3000"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment