Skip to content

Instantly share code, notes, and snippets.

@skovtunenko
Forked from marianogappa/backpressure.go
Created October 20, 2017 07:36
Show Gist options
  • Save skovtunenko/b3a813fd23173609a2e620acd5b765e9 to your computer and use it in GitHub Desktop.
Save skovtunenko/b3a813fd23173609a2e620acd5b765e9 to your computer and use it in GitHub Desktop.
Example backpressure implementation in Go
/*
This snippet is an example of backpressure implementation in Go.
It doesn't run in Go Playground, because it starts an HTTP Server.
The example starts an HTTP server and sends multiple requests to it. The server starts denying
requests by replying an "X" (i.e. a 502) when its buffered channel reaches capacity.
This is not the same as rate-limiting; you might be interested in https://github.com/juju/ratelimit
or https://godoc.org/golang.org/x/time/rate.
Note that asking the question: `len(ch) < cap(ch) ?` is a racey operation; the channel might
actually be full a microsecond later. In the context of short running requests this is not an
issue.
Outputs:
```
$ go run test.go
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√X√X
√√X√√X√√X√√X√√X√√X√√X√√X√X√√√X√√X√X√√√X√X√√X√√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√√X√√X√√X
√X√√√X√X√√X√√X√√X√√X√X√√X√X√√√X√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√X√√X√X√√X√√X√√X√√X√√X√
√X√√X√X√√X√√X√√X√√X√√X√X√√X√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
```
*/
package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
func main() {
requests := make(chan request, 100)
go startServer(requests)
go process(requests)
makeRequests(500, 6*time.Millisecond)
}
func startServer(rq chan request) {
http.HandleFunc("/requests", handle(rq))
http.ListenAndServe(":9000", nil)
}
func process(rq chan request) {
for r := range rq {
r.process()
}
}
func makeRequests(count int, cooldown time.Duration) {
wg := sync.WaitGroup{}
for i := 0; i < count; i++ {
go func() {
wg.Add(1)
defer wg.Done()
response, _ := http.Get("http://localhost:9000/requests")
defer response.Body.Close()
b, _ := ioutil.ReadAll(response.Body)
fmt.Print(string(b))
}()
time.Sleep(cooldown)
}
wg.Wait()
}
func handle(rq chan request) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if len(rq) < cap(rq) {
r := newRequest(r)
rq <- r
w.Write(<-r.response)
} else {
w.Write([]byte("X"))
}
}
}
type request struct {
r *http.Request
response chan []byte
}
func newRequest(r *http.Request) request { return request{r, make(chan []byte)} }
func (r request) process() {
time.Sleep(10 * time.Millisecond)
r.response <- []byte("√")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment