Last active
June 15, 2020 23:08
-
-
Save thrawn01/05e2a972553028c43ed7e07348934d3d to your computer and use it in GitHub Desktop.
Simple WorkerPool Implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"io" | |
"log" | |
"net" | |
_ "net/http/pprof" | |
"strings" | |
"sync" | |
"sync/atomic" | |
"time" | |
) | |
var handleCount int64 | |
var busyCount int64 | |
var workerPool *pool | |
func main() { | |
go func() { | |
for { | |
c := atomic.SwapInt64(&handleCount, 0) | |
b := atomic.SwapInt64(&busyCount, 0) | |
if c != 0 || b != 0 { | |
log.Printf("Handled: %d Busy: %d\n", c, b) | |
} | |
time.Sleep(time.Second) | |
} | |
}() | |
ln, err := net.Listen("tcp", ":8972") | |
if err != nil { | |
panic(err) | |
} | |
workerPool = newPool(100, 10_000) | |
for { | |
conn, e := ln.Accept() | |
if e != nil { | |
if ne, ok := e.(net.Error); ok && ne.Temporary() { | |
// This can happen if either the worker pool queue is too large, or the queue is | |
// full AND the busy queue is full! | |
if strings.Contains(ne.Error(), "too many open files") { | |
continue | |
} | |
log.Printf("accept temp err: %v", ne) | |
continue | |
} | |
log.Printf("accept err: %v", e) | |
return | |
} | |
workerPool.Add(conn) | |
} | |
workerPool.Close() | |
} | |
type pool struct { | |
queue chan net.Conn | |
busyQueue chan net.Conn | |
mu sync.Mutex | |
closed bool | |
done chan struct{} | |
} | |
func newPool(w int, t int) *pool { | |
p := pool{ | |
busyQueue: make(chan net.Conn, t), | |
queue: make(chan net.Conn, t), | |
done: make(chan struct{}), | |
} | |
for i := 0; i < w; i++ { | |
go p.worker() | |
} | |
for i := 0; i < w; i++ { | |
go p.busyWorker() | |
} | |
return &p | |
} | |
func (p *pool) Close() { | |
p.mu.Lock() | |
p.closed = true | |
close(p.done) | |
close(p.queue) | |
p.mu.Unlock() | |
} | |
func (p *pool) Add(conn net.Conn) { | |
p.mu.Lock() | |
if p.closed { | |
p.mu.Unlock() | |
return | |
} | |
p.mu.Unlock() | |
// Enable this code and disable the select below to reproduce | |
// the timeout on connect issue. | |
//p.queue <- conn | |
select { | |
case p.queue <- conn: | |
default: | |
// If worker queue is full tell clients we are too busy and | |
// to try again later. This avoids connect timeout issue if | |
// all workers are busy. | |
p.busyQueue <- conn | |
} | |
} | |
func (p *pool) busyWorker() { | |
for { | |
select { | |
case <-p.done: | |
return | |
case conn := <-p.busyQueue: | |
if conn != nil { | |
if _, err := conn.Write([]byte("421 Server is busy; try again later")); err != nil { | |
if err == io.EOF { | |
continue | |
} | |
// Remote end hung up | |
if strings.Contains(err.Error(), "broken pipe") { | |
continue | |
} | |
log.Printf("busy write error %v", err) | |
} | |
conn.Close() | |
atomic.AddInt64(&busyCount, 1) | |
} | |
} | |
} | |
} | |
func (p *pool) worker() { | |
for { | |
select { | |
case <-p.done: | |
return | |
case conn := <-p.queue: | |
if conn != nil { | |
_ = conn.SetReadDeadline(time.Now().Add(time.Second)) | |
// Pretend we are influx | |
if _, err := conn.Write([]byte("220 Mailgun Influx ready")); err != nil { | |
// Remote end hung up | |
if strings.Contains(err.Error(), "broken pipe") { | |
continue | |
} | |
log.Printf("worker write error %v", err) | |
} | |
// Simulate some network calls to querator and blackbird etc... | |
time.Sleep(time.Millisecond * 20) | |
// Create memory on the heap to ensure GC runs | |
buf := make([]byte, 4096) | |
// Attempt to read from the socket | |
conn.Read(buf) | |
conn.Close() | |
atomic.AddInt64(&handleCount, 1) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment