Skip to content

Instantly share code, notes, and snippets.

@kkleidal
Created July 20, 2017 15:58
Show Gist options
  • Save kkleidal/098b1b2c08a7c7e03a89f44f9e6b74d6 to your computer and use it in GitHub Desktop.
Save kkleidal/098b1b2c08a7c7e03a89f44f9e6b74d6 to your computer and use it in GitHub Desktop.
package main
import (
"log"
"strings"
"time"
)
type threadPoolWorker struct {
free chan *threadPoolWorker
requestQueue chan WorkRequest
stopped bool
stopAlert chan bool
}
func newThreadPoolWorker(free chan *threadPoolWorker) (worker threadPoolWorker) {
worker.free = free
worker.requestQueue = make(chan WorkRequest)
worker.stopped = false
worker.stopAlert = make(chan bool, 1)
return
}
func (worker threadPoolWorker) run() {
if !worker.stopped {
select {
case worker.free <- &worker:
break
case <-worker.stopAlert:
break
}
}
for !worker.stopped {
select {
case task := <-worker.requestQueue:
log.Printf("Doing task: %s...", task.Request)
time.Sleep(10 * time.Second)
res := WorkResponse{task.Request, []byte(strings.ToUpper(string(task.Request))), nil}
log.Printf("Finished task: %s, sending response %s.", res.Request, res.Response)
select {
case task.Response <- res:
select {
case worker.free <- &worker:
break
case <-worker.stopAlert:
break
}
break
case <-worker.stopAlert:
break
}
break
case <-worker.stopAlert:
break
}
}
}
func (worker threadPoolWorker) start() {
if !worker.stopped {
go worker.run()
}
}
func (worker threadPoolWorker) stop() {
if !worker.stopped {
worker.stopped = true
worker.stopAlert <- true
}
}
type ThreadPoolExecutor struct {
numWorkers int
queueSize int
workers []threadPoolWorker
freeWorkers map[*threadPoolWorker]bool
requestQueue chan WorkRequest
freeQueue chan *threadPoolWorker
stopped bool
stopAlert chan bool
}
func NewThreadPoolExecutor(numWorkers int, queueSize int) (exc ThreadPoolExecutor) {
exc.numWorkers = numWorkers
exc.queueSize = queueSize
exc.workers = make([]threadPoolWorker, 0)
exc.freeWorkers = make(map[*threadPoolWorker]bool)
exc.requestQueue = make(chan WorkRequest, queueSize)
exc.freeQueue = make(chan *threadPoolWorker)
exc.stopped = false
exc.stopAlert = make(chan bool, 1)
return
}
func (exc ThreadPoolExecutor) RequestQueue() chan<- WorkRequest {
return exc.requestQueue
}
func (exc ThreadPoolExecutor) Start() {
if !exc.stopped {
go exc.run()
}
}
func (exc ThreadPoolExecutor) Stop() {
if !exc.stopped {
exc.stopped = true
exc.stopAlert <- true
}
}
func (exc ThreadPoolExecutor) run() {
if !exc.stopped {
for i := 0; i < exc.numWorkers; i++ {
worker := newThreadPoolWorker(exc.freeQueue)
worker.start()
exc.workers = append(exc.workers, worker)
}
}
for !exc.stopped {
if len(exc.freeWorkers) > 0 {
// Find a free worker:
var worker *threadPoolWorker
for k := range exc.freeWorkers {
worker = k
break
}
// Register the worker as not free:
delete(exc.freeWorkers, worker)
// Send a request to the worker
select {
case task := <-exc.requestQueue:
worker.requestQueue <- task
break
case <-exc.stopAlert:
break
}
} else {
// Wait for workers to free up:
select {
case worker := <-exc.freeQueue:
exc.freeWorkers[worker] = true
case <-exc.stopAlert:
break
}
}
}
for i := range exc.workers {
exc.workers[i].stop()
}
}
type WorkRequest struct {
Request []byte
Response chan WorkResponse
}
type WorkResponse struct {
Request []byte
Response []byte
Err error
}
func main() {
log.Printf("Starting.")
pool := NewThreadPoolExecutor(2, 1)
pool.Start()
task1 := WorkRequest{[]byte("request 1!!"), make(chan WorkResponse)}
task2 := WorkRequest{[]byte("request 2!!"), make(chan WorkResponse)}
task3 := WorkRequest{[]byte("request 3!!"), make(chan WorkResponse)}
pool.RequestQueue() <- task1
pool.RequestQueue() <- task2
pool.RequestQueue() <- task3
res1 := <-task1.Response
log.Printf("Finished 1. Got '%s'.", res1.Response)
res2 := <-task2.Response
log.Printf("Finished 2. Got '%s'.", res2.Response)
res3 := <-task3.Response
log.Printf("Finished 3. Got '%s'.", res3.Response)
}
/*
Prints:
2017/07/20 11:57:53 Starting.
2017/07/20 11:57:53 Doing task: request 2!!...
2017/07/20 11:57:53 Doing task: request 1!!...
2017/07/20 11:58:03 Finished task: request 2!!, sending response REQUEST 2!!.
2017/07/20 11:58:03 Finished task: request 1!!, sending response REQUEST 1!!.
2017/07/20 11:58:03 Finished 1. Got 'REQUEST 1!!'.
2017/07/20 11:58:03 Doing task: request 3!!...
2017/07/20 11:58:03 Finished 2. Got 'REQUEST 2!!'.
2017/07/20 11:58:13 Finished task: request 3!!, sending response REQUEST 3!!.
2017/07/20 11:58:13 Finished 3. Got 'REQUEST 3!!'.
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment