Last active
August 21, 2018 10:24
-
-
Save ksurent/ced99c8fd62a75fe329fbd315d819f9b to your computer and use it in GitHub Desktop.
A simple simulation to see how client-go's workqueue behaves in presence of stuck consumers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
"math/rand" | |
"net/http" | |
"os" | |
"os/signal" | |
"time" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/prometheus/client_golang/prometheus/promhttp" | |
krand "k8s.io/apimachinery/pkg/util/rand" | |
"k8s.io/apimachinery/pkg/util/wait" | |
"k8s.io/client-go/util/workqueue" | |
) | |
func main() { | |
mp := &metricsProvider{} | |
workqueue.SetProvider(mp) | |
wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") | |
sigCh := make(chan os.Signal, 1) | |
signal.Notify(sigCh, os.Interrupt) | |
go producer(wq) | |
threadiness := 2 | |
for i := 0; i < threadiness; i++ { | |
go consumer(i, wq) | |
} | |
reg := prometheus.NewRegistry() | |
reg.MustRegister(mp.Collectors()...) | |
go func() { | |
log.Println("starting metrics") | |
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) | |
log.Fatal(http.ListenAndServe(":8889", nil)) | |
}() | |
<-sigCh | |
wq.ShutDown() | |
log.Println("terminating") | |
} | |
func consumer(id int, wq workqueue.RateLimitingInterface) { | |
log.Println("starting consumer", id) | |
for { | |
it, stop := wq.Get() | |
if stop { | |
break | |
} | |
var j time.Duration | |
if rand.Int31n(100) >= 99 { | |
j = 100 * time.Second | |
} else { | |
j = wait.Jitter(10*time.Millisecond, 20) | |
} | |
log.Println("consuming", id, "in", j) | |
time.Sleep(j) | |
wq.Forget(it) | |
wq.Done(it) | |
} | |
} | |
func producer(wq workqueue.RateLimitingInterface) { | |
log.Println("starting producer") | |
for { | |
j := wait.Jitter(1*time.Second, 2) | |
log.Println("producing in", j) | |
time.Sleep(j) | |
wq.Add(krand.String(8)) | |
} | |
} | |
type metricsProvider struct { | |
collectors []prometheus.Collector | |
} | |
func (mp *metricsProvider) Collectors() []prometheus.Collector { | |
return mp.collectors | |
} | |
func (mp *metricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { | |
g := prometheus.NewGauge(prometheus.GaugeOpts{ | |
Subsystem: name, | |
Name: "depth", | |
Help: "queue depth", | |
}) | |
mp.collectors = append(mp.collectors, g) | |
return g | |
} | |
func (mp *metricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { | |
c := prometheus.NewCounter(prometheus.CounterOpts{ | |
Subsystem: name, | |
Name: "adds", | |
Help: "unique items added", | |
}) | |
mp.collectors = append(mp.collectors, c) | |
return c | |
} | |
func (mp *metricsProvider) NewLatencyMetric(name string) workqueue.SummaryMetric { | |
s := prometheus.NewSummary(prometheus.SummaryOpts{ | |
Subsystem: name, | |
Name: "latency_microseconds", | |
Help: "how long items wait to be picked up", | |
}) | |
mp.collectors = append(mp.collectors, s) | |
return s | |
} | |
func (mp *metricsProvider) NewWorkDurationMetric(name string) workqueue.SummaryMetric { | |
s := prometheus.NewSummary(prometheus.SummaryOpts{ | |
Subsystem: name, | |
Name: "duration_microseconds", | |
Help: "how long items take to process", | |
}) | |
mp.collectors = append(mp.collectors, s) | |
return s | |
} | |
func (mp *metricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { | |
c := prometheus.NewCounter(prometheus.CounterOpts{ | |
Subsystem: name, | |
Name: "retries", | |
Help: "???", | |
}) | |
mp.collectors = append(mp.collectors, c) | |
return c | |
} |
Author
ksurent
commented
Aug 21, 2018
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment