Skip to content

Instantly share code, notes, and snippets.

@yobayob
Created March 30, 2017 12:27
Show Gist options
  • Save yobayob/05304d5c82868f744691e4114c5acfbd to your computer and use it in GitHub Desktop.
Save yobayob/05304d5c82868f744691e4114c5acfbd to your computer and use it in GitHub Desktop.
package common
import (
"time"
)
type Task func()
type dispatcher struct {
WorkerPool chan chan Task
TaskQueue chan Task
}
type workerPoolType chan chan Task
type worker struct {
ID int
WorkerPool workerPoolType
Task chan (Task)
QuitChan chan bool
}
func NewDispatcher() *dispatcher {
var D dispatcher
D.WorkerPool = make(workerPoolType, Config.Workers)
D.TaskQueue = make(chan Task, 1)
for i := 0; i < Config.Workers; i++ {
worker := NewWorker(i+1, D.WorkerPool)
worker.Start()
}
return &D
}
func (D *dispatcher) AddTicker(task Task, interval time.Duration) {
go func() {
for {
select {
case D.TaskQueue <- task:
}
select {
case <-time.After(interval):
}
}
}()
}
func (D *dispatcher) Start() {
for {
select {
case task := <-D.TaskQueue:
go func() {
worker := <-D.WorkerPool
worker <- task
}()
}
}
}
func (w *worker) Start() {
go w.tasksProcessor()
}
func (w *worker) tasksProcessor() {
for {
w.WorkerPool <- w.Task
select {
case work := <-w.Task:
work()
case <-w.QuitChan:
return
}
}
}
func (w worker) Stop() {
go func() {
w.QuitChan <- true
}()
}
func NewWorker(id int, workerQueue chan chan Task) *worker {
return &worker{
ID: id,
WorkerPool: workerQueue,
Task: make(chan Task),
QuitChan: make(chan bool),
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment