Skip to content

Instantly share code, notes, and snippets.

@dmotylev
Last active August 27, 2020 16:26
Show Gist options
  • Save dmotylev/46ae682fe9ac3629a1f16b04e664b2a2 to your computer and use it in GitHub Desktop.
Save dmotylev/46ae682fe9ac3629a1f16b04e664b2a2 to your computer and use it in GitHub Desktop.
golang's simple worker pool
package internal
import "sync/atomic"
type worker struct {
jobs chan func()
done chan struct{}
}
func newWorker() *worker {
return &worker{
jobs: make(chan func()),
done: make(chan struct{}),
}
}
func (w *worker) start(pool chan chan func()) {
go func() {
for {
pool <- w.jobs
select {
case job := <-w.jobs:
job()
case <-w.done:
return
}
}
}()
}
func (w *worker) stop() {
close(w.done)
}
type WorkerPool struct {
workers []*worker
queue chan func()
stopped int32
}
func NewWorkerPool(size int) *WorkerPool {
workers := make([]*worker, 0, size)
for i := 0; i < size; i++ {
workers = append(workers, newWorker())
}
return &WorkerPool{
workers: workers,
queue: make(chan func()),
}
}
func (p *WorkerPool) Start() {
pool := make(chan chan func(), len(p.workers))
for _, w := range p.workers {
w.start(pool)
}
go func() {
for {
select {
case job, ok := <-p.queue:
if !ok {
return
}
go func(job func()) {
<-pool <- job
}(job)
}
}
}()
}
func (p *WorkerPool) Submit(job func()) {
if atomic.LoadInt32(&p.stopped) != 0 {
return
}
p.queue <- job
}
func (p *WorkerPool) Stop() {
if atomic.AddInt32(&p.stopped, 1) != 1 {
return
}
close(p.queue)
for _, w := range p.workers {
w.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment