Last active
July 27, 2018 09:29
-
-
Save narqo/b957a9a2dbb7cf4b9e007ab618a4ecfe to your computer and use it in GitHub Desktop.
test worker+limiter with antiburst and gracesful shutdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"io" | |
"log" | |
"net/http" | |
"os" | |
"os/signal" | |
"time" | |
) | |
const addr = ":8080" | |
const maxSeq = 1 | |
func main() { | |
ctx := context.Background() | |
bh := newBackupHandler(maxSeq) | |
defer bh.Stop() | |
go bh.Run(ctx) | |
mux := http.NewServeMux() | |
mux.Handle("/ping", bh) | |
srv := &http.Server{ | |
Addr: addr, | |
Handler: mux, | |
} | |
go func() { | |
log.Printf("server is running on %s\n", srv.Addr) | |
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { | |
log.Fatal(err) | |
} | |
}() | |
sigs := make(chan os.Signal, 1) | |
signal.Notify(sigs, os.Interrupt) | |
<-sigs | |
if err := srv.Shutdown(ctx); err != nil { | |
println(err) | |
} | |
} | |
type backupHandler struct { | |
tasks chan string | |
stop chan struct{} | |
done chan struct{} | |
} | |
func newBackupHandler(max int) *backupHandler { | |
bh := &backupHandler{ | |
tasks: make(chan string, max), | |
stop: make(chan struct{}), | |
done: make(chan struct{}), | |
} | |
return bh | |
} | |
func (bh *backupHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
select { | |
case bh.tasks <- "task": | |
default: | |
} | |
io.WriteString(w, "ok") | |
} | |
func (bh *backupHandler) Run(ctx context.Context) { | |
defer close(bh.done) | |
ctx, cancel := context.WithCancel(ctx) | |
done := make(chan struct{}) | |
go func() { | |
select { | |
case <-ctx.Done(): | |
case <-bh.stop: | |
cancel() | |
} | |
close(done) | |
}() | |
const burstInterval = 5 * time.Second | |
t := time.NewTimer(burstInterval) | |
lim := make(chan struct{}, 1) | |
for { | |
select { | |
case tk := <-bh.tasks: | |
select { | |
case <-lim: | |
println("burst: detected, sleep") | |
sleep(burstInterval, done) | |
default: | |
} | |
lim <- struct{}{} | |
select { | |
case <-done: | |
return | |
default: | |
} | |
bh.DoBackup(ctx, tk) | |
t.Reset(burstInterval) | |
case <-t.C: | |
select { | |
case <-lim: | |
println("burst: not happend, skip") | |
default: | |
} | |
case <-done: | |
return | |
} | |
} | |
} | |
func (bh *backupHandler) Stop() { | |
close(bh.stop) | |
println("stop: gracefully waiting") | |
<-bh.done | |
} | |
func (bh *backupHandler) DoBackup(ctx context.Context, tk string) { | |
println("backup: start", tk) | |
defer println("backup: end", tk) | |
time.Sleep(10 * time.Second) | |
} | |
func sleep(d time.Duration, done <-chan struct{}) { | |
t := time.NewTimer(d) | |
select { | |
case <-t.C: | |
case <-done: | |
if !t.Stop() { | |
<-t.C | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment