Skip to content

Instantly share code, notes, and snippets.

@kaneshin
Created August 17, 2016 16:39
Show Gist options
  • Save kaneshin/3f10878d1f60668eb3af01501eb67496 to your computer and use it in GitHub Desktop.
Save kaneshin/3f10878d1f60668eb3af01501eb67496 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"os"
"strconv"
"sync"
"time"
)
type (
// Dispatcher represents a management workers.
Dispatcher struct {
pool chan *worker
queue chan interface{}
workers []*worker
wg sync.WaitGroup
quit chan struct{}
}
// worker represents the worker that executes the job.
worker struct {
dispatcher *Dispatcher
data chan interface{}
fn func(interface{})
quit chan struct{}
}
)
// NewDispatcher returns a pointer of Dispatcher.
func NewDispatcher() *Dispatcher {
maxWorkers, _ := strconv.Atoi(os.Getenv("MAX_WORKERS"))
maxQueues, _ := strconv.Atoi(os.Getenv("MAX_QUEUES"))
d := &Dispatcher{
pool: make(chan *worker, maxWorkers),
queue: make(chan interface{}, maxQueues),
quit: make(chan struct{}),
}
d.workers = make([]*worker, cap(d.pool))
for i := 0; i < cap(d.pool); i++ {
w := worker{
dispatcher: d,
data: make(chan interface{}),
quit: make(chan struct{}),
}
d.workers[i] = &w
w.start()
}
return d
}
// Add adds a given value to the queue of the dispatcher.
func (d *Dispatcher) Add(v interface{}) {
d.wg.Add(1)
d.queue <- v
}
// SetFunc sets a function to dispatch dequeued values.
func (d *Dispatcher) SetFunc(f func(interface{})) {
for _, w := range d.workers {
w.fn = f
}
}
// Start starts the specified dispatcher but does not wait for it to complete.
func (d *Dispatcher) Start() {
go func() {
for {
select {
case v, ok := <-d.queue:
if !ok {
return
}
worker := <-d.pool
worker.data <- v
case <-d.quit:
return
}
}
}()
}
// Wait waits for the dispatcher to exit. It must have been started by Start.
func (d *Dispatcher) Wait() {
d.wg.Wait()
}
// Stop stops the dispatcher to execute. The dispatcher stops gracefully
// if the given boolean is false.
func (d *Dispatcher) Stop(immediately bool) {
if !immediately {
d.Wait()
}
d.quit <- struct{}{}
for _, w := range d.workers {
w.quit <- struct{}{}
}
}
// Destroy stops the dispatcher to execute and closes all channels.
// The dispatcher stops gracefully if the given boolean is false.
func (d *Dispatcher) Destroy(immediately bool) {
d.Stop(immediately)
close(d.queue)
for _, w := range d.workers {
close(w.data)
}
}
func (w *worker) start() {
go func() {
for {
// register the current worker into the dispatch pool
w.dispatcher.pool <- w
select {
case v, ok := <-w.data:
if !ok {
// receive a close signal
return
}
if w.fn != nil {
w.fn(v)
}
w.dispatcher.wg.Done()
case <-w.quit:
// receive a signal to stop
return
}
}
}()
}
func main() {
d := NewDispatcher()
d.SetFunc(func(v interface{}) {
time.Sleep(300 * time.Millisecond)
fmt.Println(v)
})
d.Start()
for i := 0; i < 1000; i++ {
d.Add(i)
}
d.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment