Last active
August 27, 2020 16:26
-
-
Save dmotylev/46ae682fe9ac3629a1f16b04e664b2a2 to your computer and use it in GitHub Desktop.
golang's simple worker pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package internal | |
import "sync/atomic" | |
type worker struct { | |
jobs chan func() | |
done chan struct{} | |
} | |
func newWorker() *worker { | |
return &worker{ | |
jobs: make(chan func()), | |
done: make(chan struct{}), | |
} | |
} | |
func (w *worker) start(pool chan chan func()) { | |
go func() { | |
for { | |
pool <- w.jobs | |
select { | |
case job := <-w.jobs: | |
job() | |
case <-w.done: | |
return | |
} | |
} | |
}() | |
} | |
func (w *worker) stop() { | |
close(w.done) | |
} | |
type WorkerPool struct { | |
workers []*worker | |
queue chan func() | |
stopped int32 | |
} | |
func NewWorkerPool(size int) *WorkerPool { | |
workers := make([]*worker, 0, size) | |
for i := 0; i < size; i++ { | |
workers = append(workers, newWorker()) | |
} | |
return &WorkerPool{ | |
workers: workers, | |
queue: make(chan func()), | |
} | |
} | |
func (p *WorkerPool) Start() { | |
pool := make(chan chan func(), len(p.workers)) | |
for _, w := range p.workers { | |
w.start(pool) | |
} | |
go func() { | |
for { | |
select { | |
case job, ok := <-p.queue: | |
if !ok { | |
return | |
} | |
go func(job func()) { | |
<-pool <- job | |
}(job) | |
} | |
} | |
}() | |
} | |
func (p *WorkerPool) Submit(job func()) { | |
if atomic.LoadInt32(&p.stopped) != 0 { | |
return | |
} | |
p.queue <- job | |
} | |
func (p *WorkerPool) Stop() { | |
if atomic.AddInt32(&p.stopped, 1) != 1 { | |
return | |
} | |
close(p.queue) | |
for _, w := range p.workers { | |
w.stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment