-
-
Save skovtunenko/b3a813fd23173609a2e620acd5b765e9 to your computer and use it in GitHub Desktop.
Example backpressure implementation in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This snippet is an example of backpressure implementation in Go. | |
It doesn't run in Go Playground, because it starts an HTTP Server. | |
The example starts an HTTP server and sends multiple requests to it. The server starts denying | |
requests by replying an "X" (i.e. a 502) when its buffered channel reaches capacity. | |
This is not the same as rate-limiting; you might be interested in https://github.com/juju/ratelimit | |
or https://godoc.org/golang.org/x/time/rate. | |
Note that asking the question: `len(ch) < cap(ch) ?` is a racey operation; the channel might | |
actually be full a microsecond later. In the context of short running requests this is not an | |
issue. | |
Outputs: | |
``` | |
$ go run test.go | |
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√ | |
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√X√X | |
√√X√√X√√X√√X√√X√√X√√X√√X√X√√√X√√X√X√√√X√X√√X√√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√√X√√X√√X | |
√X√√√X√X√√X√√X√√X√√X√X√√X√X√√√X√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√X√√X√X√√X√√X√√X√√X√√X√ | |
√X√√X√X√√X√√X√√X√√X√√X√X√√X√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√ | |
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√ | |
``` | |
*/ | |
package main | |
import ( | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
"sync" | |
"time" | |
) | |
func main() { | |
requests := make(chan request, 100) | |
go startServer(requests) | |
go process(requests) | |
makeRequests(500, 6*time.Millisecond) | |
} | |
func startServer(rq chan request) { | |
http.HandleFunc("/requests", handle(rq)) | |
http.ListenAndServe(":9000", nil) | |
} | |
func process(rq chan request) { | |
for r := range rq { | |
r.process() | |
} | |
} | |
func makeRequests(count int, cooldown time.Duration) { | |
wg := sync.WaitGroup{} | |
for i := 0; i < count; i++ { | |
go func() { | |
wg.Add(1) | |
defer wg.Done() | |
response, _ := http.Get("http://localhost:9000/requests") | |
defer response.Body.Close() | |
b, _ := ioutil.ReadAll(response.Body) | |
fmt.Print(string(b)) | |
}() | |
time.Sleep(cooldown) | |
} | |
wg.Wait() | |
} | |
func handle(rq chan request) func(http.ResponseWriter, *http.Request) { | |
return func(w http.ResponseWriter, r *http.Request) { | |
if len(rq) < cap(rq) { | |
r := newRequest(r) | |
rq <- r | |
w.Write(<-r.response) | |
} else { | |
w.Write([]byte("X")) | |
} | |
} | |
} | |
type request struct { | |
r *http.Request | |
response chan []byte | |
} | |
func newRequest(r *http.Request) request { return request{r, make(chan []byte)} } | |
func (r request) process() { | |
time.Sleep(10 * time.Millisecond) | |
r.response <- []byte("√") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment