Skip to content

Instantly share code, notes, and snippets.

@thrawn01
Last active June 15, 2020 23:08
Show Gist options
  • Save thrawn01/05e2a972553028c43ed7e07348934d3d to your computer and use it in GitHub Desktop.
Save thrawn01/05e2a972553028c43ed7e07348934d3d to your computer and use it in GitHub Desktop.
Simple WorkerPool Implementation
package main
import (
"io"
"log"
"net"
_ "net/http/pprof"
"strings"
"sync"
"sync/atomic"
"time"
)
var handleCount int64
var busyCount int64
var workerPool *pool
func main() {
go func() {
for {
c := atomic.SwapInt64(&handleCount, 0)
b := atomic.SwapInt64(&busyCount, 0)
if c != 0 || b != 0 {
log.Printf("Handled: %d Busy: %d\n", c, b)
}
time.Sleep(time.Second)
}
}()
ln, err := net.Listen("tcp", ":8972")
if err != nil {
panic(err)
}
workerPool = newPool(100, 10_000)
for {
conn, e := ln.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
// This can happen if either the worker pool queue is too large, or the queue is
// full AND the busy queue is full!
if strings.Contains(ne.Error(), "too many open files") {
continue
}
log.Printf("accept temp err: %v", ne)
continue
}
log.Printf("accept err: %v", e)
return
}
workerPool.Add(conn)
}
workerPool.Close()
}
type pool struct {
queue chan net.Conn
busyQueue chan net.Conn
mu sync.Mutex
closed bool
done chan struct{}
}
func newPool(w int, t int) *pool {
p := pool{
busyQueue: make(chan net.Conn, t),
queue: make(chan net.Conn, t),
done: make(chan struct{}),
}
for i := 0; i < w; i++ {
go p.worker()
}
for i := 0; i < w; i++ {
go p.busyWorker()
}
return &p
}
func (p *pool) Close() {
p.mu.Lock()
p.closed = true
close(p.done)
close(p.queue)
p.mu.Unlock()
}
func (p *pool) Add(conn net.Conn) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return
}
p.mu.Unlock()
// Enable this code and disable the select below to reproduce
// the timeout on connect issue.
//p.queue <- conn
select {
case p.queue <- conn:
default:
// If worker queue is full tell clients we are too busy and
// to try again later. This avoids connect timeout issue if
// all workers are busy.
p.busyQueue <- conn
}
}
func (p *pool) busyWorker() {
for {
select {
case <-p.done:
return
case conn := <-p.busyQueue:
if conn != nil {
if _, err := conn.Write([]byte("421 Server is busy; try again later")); err != nil {
if err == io.EOF {
continue
}
// Remote end hung up
if strings.Contains(err.Error(), "broken pipe") {
continue
}
log.Printf("busy write error %v", err)
}
conn.Close()
atomic.AddInt64(&busyCount, 1)
}
}
}
}
func (p *pool) worker() {
for {
select {
case <-p.done:
return
case conn := <-p.queue:
if conn != nil {
_ = conn.SetReadDeadline(time.Now().Add(time.Second))
// Pretend we are influx
if _, err := conn.Write([]byte("220 Mailgun Influx ready")); err != nil {
// Remote end hung up
if strings.Contains(err.Error(), "broken pipe") {
continue
}
log.Printf("worker write error %v", err)
}
// Simulate some network calls to querator and blackbird etc...
time.Sleep(time.Millisecond * 20)
// Create memory on the heap to ensure GC runs
buf := make([]byte, 4096)
// Attempt to read from the socket
conn.Read(buf)
conn.Close()
atomic.AddInt64(&handleCount, 1)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment