Skip to content

Instantly share code, notes, and snippets.

@manhdaovan
Last active June 24, 2019 07:37
Show Gist options
  • Save manhdaovan/e61bff3ef334432fe1698201f4b5bc8c to your computer and use it in GitHub Desktop.
Save manhdaovan/e61bff3ef334432fe1698201f4b5bc8c to your computer and use it in GitHub Desktop.
Worker pool in Go
package workerpool
import (
"context"
)
/*
* Worker Pool for executing heavy tasks to avoid OOM error from OS
* Usage example:
* pool := workerpool.NewPool(3) // Number of workers should be adjusted appropriately to your services
* pool.Start()
* task := pool.AddTask(ctx, taskFunc)
* result, err := task.GetResult()
*/
// TaskFunc is general format of a task
type TaskFunc func(ctx context.Context) (interface{}, error)
// WorkerPool contains maximum number of workers
// and a channel to receive tasks
type WorkerPool struct {
numberWorkers uint
tasksChan chan Task
}
// NewPool inits new pool
func NewPool(numberWorkers uint) WorkerPool {
return WorkerPool{
numberWorkers: numberWorkers,
tasksChan: make(chan Task, numberWorkers),
}
}
// AddTask adds new task to pool
func (p WorkerPool) AddTask(ctx context.Context, f TaskFunc) Task {
t := Task{
executor: f,
taskResult: make(chan interface{}, 1),
ctx: ctx,
}
p.tasksChan <- t
return t
}
func (p WorkerPool) executeOneTask() {
for {
select {
case t := <-p.tasksChan:
t.execute()
}
}
}
// Start starts pool to receive task
// Must be called AFTER NewPool method immediately
func (p WorkerPool) Start() {
for i := uint(0); i < p.numberWorkers; i++ {
go p.executeOneTask()
}
}
// Task contains all info of a task
type Task struct {
ctx context.Context
executor TaskFunc
taskResult chan interface{}
taskErr error
}
func (t Task) execute() {
result, err := t.executor(t.ctx)
t.taskErr = err
t.taskResult <- result
}
// GetResult returns result of task after executed
func (t Task) GetResult() (interface{}, error) {
return <-t.taskResult, t.taskErr
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment