Last active
June 24, 2019 07:37
-
-
Save manhdaovan/e61bff3ef334432fe1698201f4b5bc8c to your computer and use it in GitHub Desktop.
Worker pool in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package workerpool | |
import ( | |
"context" | |
) | |
/* | |
* Worker Pool for executing heavy tasks to avoid OOM error from OS | |
* Usage example: | |
* pool := workerpool.NewPool(3) // Number of workers should be adjusted appropriately to your services | |
* pool.Start() | |
* task := pool.AddTask(ctx, taskFunc) | |
* result, err := task.GetResult() | |
*/ | |
// TaskFunc is general format of a task | |
type TaskFunc func(ctx context.Context) (interface{}, error) | |
// WorkerPool contains maximum number of workers | |
// and a channel to receive tasks | |
type WorkerPool struct { | |
numberWorkers uint | |
tasksChan chan Task | |
} | |
// NewPool inits new pool | |
func NewPool(numberWorkers uint) WorkerPool { | |
return WorkerPool{ | |
numberWorkers: numberWorkers, | |
tasksChan: make(chan Task, numberWorkers), | |
} | |
} | |
// AddTask adds new task to pool | |
func (p WorkerPool) AddTask(ctx context.Context, f TaskFunc) Task { | |
t := Task{ | |
executor: f, | |
taskResult: make(chan interface{}, 1), | |
ctx: ctx, | |
} | |
p.tasksChan <- t | |
return t | |
} | |
func (p WorkerPool) executeOneTask() { | |
for { | |
select { | |
case t := <-p.tasksChan: | |
t.execute() | |
} | |
} | |
} | |
// Start starts pool to receive task | |
// Must be called AFTER NewPool method immediately | |
func (p WorkerPool) Start() { | |
for i := uint(0); i < p.numberWorkers; i++ { | |
go p.executeOneTask() | |
} | |
} | |
// Task contains all info of a task | |
type Task struct { | |
ctx context.Context | |
executor TaskFunc | |
taskResult chan interface{} | |
taskErr error | |
} | |
func (t Task) execute() { | |
result, err := t.executor(t.ctx) | |
t.taskErr = err | |
t.taskResult <- result | |
} | |
// GetResult returns result of task after executed | |
func (t Task) GetResult() (interface{}, error) { | |
return <-t.taskResult, t.taskErr | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment