Skip to content

Instantly share code, notes, and snippets.

@narqo
Last active July 27, 2018 09:29
Show Gist options
  • Save narqo/b957a9a2dbb7cf4b9e007ab618a4ecfe to your computer and use it in GitHub Desktop.
Save narqo/b957a9a2dbb7cf4b9e007ab618a4ecfe to your computer and use it in GitHub Desktop.
test worker+limiter with antiburst and gracesful shutdown
package main
import (
"context"
"io"
"log"
"net/http"
"os"
"os/signal"
"time"
)
const addr = ":8080"
const maxSeq = 1
func main() {
ctx := context.Background()
bh := newBackupHandler(maxSeq)
defer bh.Stop()
go bh.Run(ctx)
mux := http.NewServeMux()
mux.Handle("/ping", bh)
srv := &http.Server{
Addr: addr,
Handler: mux,
}
go func() {
log.Printf("server is running on %s\n", srv.Addr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
<-sigs
if err := srv.Shutdown(ctx); err != nil {
println(err)
}
}
type backupHandler struct {
tasks chan string
stop chan struct{}
done chan struct{}
}
func newBackupHandler(max int) *backupHandler {
bh := &backupHandler{
tasks: make(chan string, max),
stop: make(chan struct{}),
done: make(chan struct{}),
}
return bh
}
func (bh *backupHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
select {
case bh.tasks <- "task":
default:
}
io.WriteString(w, "ok")
}
func (bh *backupHandler) Run(ctx context.Context) {
defer close(bh.done)
ctx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
go func() {
select {
case <-ctx.Done():
case <-bh.stop:
cancel()
}
close(done)
}()
const burstInterval = 5 * time.Second
t := time.NewTimer(burstInterval)
lim := make(chan struct{}, 1)
for {
select {
case tk := <-bh.tasks:
select {
case <-lim:
println("burst: detected, sleep")
sleep(burstInterval, done)
default:
}
lim <- struct{}{}
select {
case <-done:
return
default:
}
bh.DoBackup(ctx, tk)
t.Reset(burstInterval)
case <-t.C:
select {
case <-lim:
println("burst: not happend, skip")
default:
}
case <-done:
return
}
}
}
func (bh *backupHandler) Stop() {
close(bh.stop)
println("stop: gracefully waiting")
<-bh.done
}
func (bh *backupHandler) DoBackup(ctx context.Context, tk string) {
println("backup: start", tk)
defer println("backup: end", tk)
time.Sleep(10 * time.Second)
}
func sleep(d time.Duration, done <-chan struct{}) {
t := time.NewTimer(d)
select {
case <-t.C:
case <-done:
if !t.Stop() {
<-t.C
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment