Created
July 20, 2017 15:58
-
-
Save kkleidal/098b1b2c08a7c7e03a89f44f9e6b74d6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
"strings" | |
"time" | |
) | |
type threadPoolWorker struct { | |
free chan *threadPoolWorker | |
requestQueue chan WorkRequest | |
stopped bool | |
stopAlert chan bool | |
} | |
func newThreadPoolWorker(free chan *threadPoolWorker) (worker threadPoolWorker) { | |
worker.free = free | |
worker.requestQueue = make(chan WorkRequest) | |
worker.stopped = false | |
worker.stopAlert = make(chan bool, 1) | |
return | |
} | |
func (worker threadPoolWorker) run() { | |
if !worker.stopped { | |
select { | |
case worker.free <- &worker: | |
break | |
case <-worker.stopAlert: | |
break | |
} | |
} | |
for !worker.stopped { | |
select { | |
case task := <-worker.requestQueue: | |
log.Printf("Doing task: %s...", task.Request) | |
time.Sleep(10 * time.Second) | |
res := WorkResponse{task.Request, []byte(strings.ToUpper(string(task.Request))), nil} | |
log.Printf("Finished task: %s, sending response %s.", res.Request, res.Response) | |
select { | |
case task.Response <- res: | |
select { | |
case worker.free <- &worker: | |
break | |
case <-worker.stopAlert: | |
break | |
} | |
break | |
case <-worker.stopAlert: | |
break | |
} | |
break | |
case <-worker.stopAlert: | |
break | |
} | |
} | |
} | |
func (worker threadPoolWorker) start() { | |
if !worker.stopped { | |
go worker.run() | |
} | |
} | |
func (worker threadPoolWorker) stop() { | |
if !worker.stopped { | |
worker.stopped = true | |
worker.stopAlert <- true | |
} | |
} | |
type ThreadPoolExecutor struct { | |
numWorkers int | |
queueSize int | |
workers []threadPoolWorker | |
freeWorkers map[*threadPoolWorker]bool | |
requestQueue chan WorkRequest | |
freeQueue chan *threadPoolWorker | |
stopped bool | |
stopAlert chan bool | |
} | |
func NewThreadPoolExecutor(numWorkers int, queueSize int) (exc ThreadPoolExecutor) { | |
exc.numWorkers = numWorkers | |
exc.queueSize = queueSize | |
exc.workers = make([]threadPoolWorker, 0) | |
exc.freeWorkers = make(map[*threadPoolWorker]bool) | |
exc.requestQueue = make(chan WorkRequest, queueSize) | |
exc.freeQueue = make(chan *threadPoolWorker) | |
exc.stopped = false | |
exc.stopAlert = make(chan bool, 1) | |
return | |
} | |
func (exc ThreadPoolExecutor) RequestQueue() chan<- WorkRequest { | |
return exc.requestQueue | |
} | |
func (exc ThreadPoolExecutor) Start() { | |
if !exc.stopped { | |
go exc.run() | |
} | |
} | |
func (exc ThreadPoolExecutor) Stop() { | |
if !exc.stopped { | |
exc.stopped = true | |
exc.stopAlert <- true | |
} | |
} | |
func (exc ThreadPoolExecutor) run() { | |
if !exc.stopped { | |
for i := 0; i < exc.numWorkers; i++ { | |
worker := newThreadPoolWorker(exc.freeQueue) | |
worker.start() | |
exc.workers = append(exc.workers, worker) | |
} | |
} | |
for !exc.stopped { | |
if len(exc.freeWorkers) > 0 { | |
// Find a free worker: | |
var worker *threadPoolWorker | |
for k := range exc.freeWorkers { | |
worker = k | |
break | |
} | |
// Register the worker as not free: | |
delete(exc.freeWorkers, worker) | |
// Send a request to the worker | |
select { | |
case task := <-exc.requestQueue: | |
worker.requestQueue <- task | |
break | |
case <-exc.stopAlert: | |
break | |
} | |
} else { | |
// Wait for workers to free up: | |
select { | |
case worker := <-exc.freeQueue: | |
exc.freeWorkers[worker] = true | |
case <-exc.stopAlert: | |
break | |
} | |
} | |
} | |
for i := range exc.workers { | |
exc.workers[i].stop() | |
} | |
} | |
type WorkRequest struct { | |
Request []byte | |
Response chan WorkResponse | |
} | |
type WorkResponse struct { | |
Request []byte | |
Response []byte | |
Err error | |
} | |
func main() { | |
log.Printf("Starting.") | |
pool := NewThreadPoolExecutor(2, 1) | |
pool.Start() | |
task1 := WorkRequest{[]byte("request 1!!"), make(chan WorkResponse)} | |
task2 := WorkRequest{[]byte("request 2!!"), make(chan WorkResponse)} | |
task3 := WorkRequest{[]byte("request 3!!"), make(chan WorkResponse)} | |
pool.RequestQueue() <- task1 | |
pool.RequestQueue() <- task2 | |
pool.RequestQueue() <- task3 | |
res1 := <-task1.Response | |
log.Printf("Finished 1. Got '%s'.", res1.Response) | |
res2 := <-task2.Response | |
log.Printf("Finished 2. Got '%s'.", res2.Response) | |
res3 := <-task3.Response | |
log.Printf("Finished 3. Got '%s'.", res3.Response) | |
} | |
/* | |
Prints: | |
2017/07/20 11:57:53 Starting. | |
2017/07/20 11:57:53 Doing task: request 2!!... | |
2017/07/20 11:57:53 Doing task: request 1!!... | |
2017/07/20 11:58:03 Finished task: request 2!!, sending response REQUEST 2!!. | |
2017/07/20 11:58:03 Finished task: request 1!!, sending response REQUEST 1!!. | |
2017/07/20 11:58:03 Finished 1. Got 'REQUEST 1!!'. | |
2017/07/20 11:58:03 Doing task: request 3!!... | |
2017/07/20 11:58:03 Finished 2. Got 'REQUEST 2!!'. | |
2017/07/20 11:58:13 Finished task: request 3!!, sending response REQUEST 3!!. | |
2017/07/20 11:58:13 Finished 3. Got 'REQUEST 3!!'. | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment