Created
August 17, 2016 16:39
-
-
Save kaneshin/3f10878d1f60668eb3af01501eb67496 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"strconv" | |
"sync" | |
"time" | |
) | |
type ( | |
// Dispatcher represents a management workers. | |
Dispatcher struct { | |
pool chan *worker | |
queue chan interface{} | |
workers []*worker | |
wg sync.WaitGroup | |
quit chan struct{} | |
} | |
// worker represents the worker that executes the job. | |
worker struct { | |
dispatcher *Dispatcher | |
data chan interface{} | |
fn func(interface{}) | |
quit chan struct{} | |
} | |
) | |
// NewDispatcher returns a pointer of Dispatcher. | |
func NewDispatcher() *Dispatcher { | |
maxWorkers, _ := strconv.Atoi(os.Getenv("MAX_WORKERS")) | |
maxQueues, _ := strconv.Atoi(os.Getenv("MAX_QUEUES")) | |
d := &Dispatcher{ | |
pool: make(chan *worker, maxWorkers), | |
queue: make(chan interface{}, maxQueues), | |
quit: make(chan struct{}), | |
} | |
d.workers = make([]*worker, cap(d.pool)) | |
for i := 0; i < cap(d.pool); i++ { | |
w := worker{ | |
dispatcher: d, | |
data: make(chan interface{}), | |
quit: make(chan struct{}), | |
} | |
d.workers[i] = &w | |
w.start() | |
} | |
return d | |
} | |
// Add adds a given value to the queue of the dispatcher. | |
func (d *Dispatcher) Add(v interface{}) { | |
d.wg.Add(1) | |
d.queue <- v | |
} | |
// SetFunc sets a function to dispatch dequeued values. | |
func (d *Dispatcher) SetFunc(f func(interface{})) { | |
for _, w := range d.workers { | |
w.fn = f | |
} | |
} | |
// Start starts the specified dispatcher but does not wait for it to complete. | |
func (d *Dispatcher) Start() { | |
go func() { | |
for { | |
select { | |
case v, ok := <-d.queue: | |
if !ok { | |
return | |
} | |
worker := <-d.pool | |
worker.data <- v | |
case <-d.quit: | |
return | |
} | |
} | |
}() | |
} | |
// Wait waits for the dispatcher to exit. It must have been started by Start. | |
func (d *Dispatcher) Wait() { | |
d.wg.Wait() | |
} | |
// Stop stops the dispatcher to execute. The dispatcher stops gracefully | |
// if the given boolean is false. | |
func (d *Dispatcher) Stop(immediately bool) { | |
if !immediately { | |
d.Wait() | |
} | |
d.quit <- struct{}{} | |
for _, w := range d.workers { | |
w.quit <- struct{}{} | |
} | |
} | |
// Destroy stops the dispatcher to execute and closes all channels. | |
// The dispatcher stops gracefully if the given boolean is false. | |
func (d *Dispatcher) Destroy(immediately bool) { | |
d.Stop(immediately) | |
close(d.queue) | |
for _, w := range d.workers { | |
close(w.data) | |
} | |
} | |
func (w *worker) start() { | |
go func() { | |
for { | |
// register the current worker into the dispatch pool | |
w.dispatcher.pool <- w | |
select { | |
case v, ok := <-w.data: | |
if !ok { | |
// receive a close signal | |
return | |
} | |
if w.fn != nil { | |
w.fn(v) | |
} | |
w.dispatcher.wg.Done() | |
case <-w.quit: | |
// receive a signal to stop | |
return | |
} | |
} | |
}() | |
} | |
func main() { | |
d := NewDispatcher() | |
d.SetFunc(func(v interface{}) { | |
time.Sleep(300 * time.Millisecond) | |
fmt.Println(v) | |
}) | |
d.Start() | |
for i := 0; i < 1000; i++ { | |
d.Add(i) | |
} | |
d.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment