Created
October 9, 2018 13:57
-
-
Save wirepair/9213e80d8ce5794c42d937d5189b7045 to your computer and use it in GitHub Desktop.
re-sizable execution worker pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Super simple re-sizable execution worker pool that allows you to add/remove workers easily | |
// obviously the doWork function should use context or something to cancel any 'work' being done if necessary. | |
// author: https://twitter.com/_wirepair | |
package main | |
import ( | |
"fmt" | |
"sync" | |
"sync/atomic" | |
"time" | |
) | |
type Workers struct { | |
workerLock sync.RWMutex | |
workers map[int64]chan struct{} // could choose a secure random # here for the worker index, but unixnano should suffice. | |
workerCount int32 | |
maxWorkers int32 | |
status int32 | |
} | |
func New(max int32) *Workers { | |
return &Workers{ | |
workers: make(map[int64]chan struct{}), | |
maxWorkers: max, | |
} | |
} | |
// Run our initial worker count. | |
func (w *Workers) Run() { | |
max := int(atomic.LoadInt32(&w.maxWorkers)) | |
for i := 0; i < max; i++ { | |
w.workerLock.Lock() | |
closeCh := make(chan struct{}) | |
w.workers[time.Now().UnixNano()] = closeCh | |
w.workerLock.Unlock() | |
go w.doWork(closeCh) | |
atomic.AddInt32(&w.workerCount, 1) | |
} | |
} | |
// addWorker adds a new worker and starts a new go routine. | |
func (w *Workers) addWorker() { | |
closeCh := make(chan struct{}) | |
atomic.AddInt32(&w.maxWorkers, 1) | |
w.workerLock.Lock() | |
w.workers[time.Now().UnixNano()] = closeCh | |
w.workerLock.Unlock() | |
go w.doWork(closeCh) | |
atomic.AddInt32(&w.workerCount, 1) | |
} | |
// removeWorker extracts a random worker and closes it's channel and deletes it | |
// from our map of workers. This will only occur after it finishes processing it's doWork iteration. | |
func (w *Workers) removeWorker() { | |
atomic.AddInt32(&w.maxWorkers, -1) | |
w.workerLock.Lock() | |
for t, ch := range w.workers { | |
close(ch) | |
delete(w.workers, t) | |
break | |
} | |
w.workerLock.Unlock() | |
atomic.AddInt32(&w.workerCount, -1) | |
} | |
func (w *Workers) doWork(closeCh chan struct{}) { | |
for { | |
select { | |
case <-closeCh: | |
fmt.Println("closed channel") | |
return | |
default: | |
} | |
fmt.Println("doing worky...") | |
time.Sleep(1 * time.Second) | |
} | |
} | |
func main() { | |
w := New(5) | |
w.Run() | |
time.Sleep(2 * time.Second) | |
w.addWorker() | |
time.Sleep(1 * time.Second) | |
fmt.Println("shutting down") | |
for i := 0; i < 6; i++ { | |
w.removeWorker() | |
} | |
time.Sleep(5 * time.Second) | |
fmt.Println("done") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment