Skip to content

Instantly share code, notes, and snippets.

@scotthaleen
Forked from marianogappa/backpressure.go
Created April 28, 2022 02:09
Show Gist options
  • Save scotthaleen/00f9cd44e2c3ea2869c5c04e97c85682 to your computer and use it in GitHub Desktop.
Save scotthaleen/00f9cd44e2c3ea2869c5c04e97c85682 to your computer and use it in GitHub Desktop.
Example backpressure implementation in Go
/*
This snippet is an example of backpressure implementation in Go.
It doesn't run in Go Playground, because it starts an HTTP Server.
The example starts an HTTP server and sends multiple requests to it. The server starts denying
requests by replying an "X" (i.e. a 502) when its buffered channel reaches capacity.
This is not the same as rate-limiting; you might be interested in https://github.com/juju/ratelimit
or https://godoc.org/golang.org/x/time/rate.
Note that asking the question: `len(ch) < cap(ch) ?` is a racey operation; the channel might
actually be full a microsecond later. In the context of short running requests this is not an
issue.
Outputs:
```
$ go run test.go
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√X√X
√√X√√X√√X√√X√√X√√X√√X√√X√X√√√X√√X√X√√√X√X√√X√√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√√X√√X√√X
√X√√√X√X√√X√√X√√X√√X√X√√X√X√√√X√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√X√√X√X√√X√√X√√X√√X√√X√
√X√√X√X√√X√√X√√X√√X√√X√X√√X√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
```
*/
package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
func main() {
requests := make(chan request, 100)
go startServer(requests)
go process(requests)
makeRequests(500, 6*time.Millisecond)
}
func startServer(rq chan request) {
http.HandleFunc("/requests", handle(rq))
http.ListenAndServe(":9000", nil)
}
func process(rq chan request) {
for r := range rq {
r.process()
}
}
func makeRequests(count int, cooldown time.Duration) {
wg := sync.WaitGroup{}
for i := 0; i < count; i++ {
go func() {
wg.Add(1)
defer wg.Done()
response, _ := http.Get("http://localhost:9000/requests")
defer response.Body.Close()
b, _ := ioutil.ReadAll(response.Body)
fmt.Print(string(b))
}()
time.Sleep(cooldown)
}
wg.Wait()
}
func handle(rq chan request) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if len(rq) < cap(rq) {
r := newRequest(r)
rq <- r
w.Write(<-r.response)
} else {
w.Write([]byte("X"))
}
}
}
type request struct {
r *http.Request
response chan []byte
}
func newRequest(r *http.Request) request { return request{r, make(chan []byte)} }
func (r request) process() {
time.Sleep(10 * time.Millisecond)
r.response <- []byte("√")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment