Skip to content

Instantly share code, notes, and snippets.

@edwinlab
Forked from marianogappa/backpressure.go
Created January 30, 2017 06:13
Show Gist options
  • Select an option

  • Save edwinlab/3f6bd743cfd103a1b8fa818b3d1823d3 to your computer and use it in GitHub Desktop.

Select an option

Save edwinlab/3f6bd743cfd103a1b8fa818b3d1823d3 to your computer and use it in GitHub Desktop.
Example backpressure implementation in Go
/*
This snippet is an example of backpressure implementation in Go.
It doesn't run in Go Playground, because it starts an HTTP Server.
The example starts an HTTP server and sends multiple requests to it. The server starts denying
requests by replying an "X" (i.e. a 502) when its buffered channel reaches capacity.
This is not the same as rate-limiting; you might be interested in https://github.com/juju/ratelimit
or https://godoc.org/golang.org/x/time/rate.
Note that asking the question: `len(ch) < cap(ch) ?` is a racey operation; the channel might
actually be full a microsecond later. In the context of short running requests this is not an
issue.
Outputs:
```
$ go run test.go
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√X√X
√√X√√X√√X√√X√√X√√X√√X√√X√X√√√X√√X√X√√√X√X√√X√√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√√X√√X√√X
√X√√√X√X√√X√√X√√X√√X√X√√X√X√√√X√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√X√√X√X√√X√√X√√X√√X√√X√
√X√√X√X√√X√√X√√X√√X√√X√X√√X√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
```
*/
package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
func main() {
requests := make(chan request, 100)
go startServer(requests)
go process(requests)
makeRequests(500, 6*time.Millisecond)
}
func startServer(rq chan request) {
http.HandleFunc("/requests", handle(rq))
http.ListenAndServe(":9000", nil)
}
func process(rq chan request) {
for r := range rq {
r.process()
}
}
func makeRequests(count int, cooldown time.Duration) {
wg := sync.WaitGroup{}
for i := 0; i < count; i++ {
go func() {
wg.Add(1)
defer wg.Done()
response, _ := http.Get("http://localhost:9000/requests")
defer response.Body.Close()
b, _ := ioutil.ReadAll(response.Body)
fmt.Print(string(b))
}()
time.Sleep(cooldown)
}
wg.Wait()
}
func handle(rq chan request) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if len(rq) < cap(rq) {
r := newRequest(r)
rq <- r
w.Write(<-r.response)
} else {
w.Write([]byte("X"))
}
}
}
type request struct {
r *http.Request
response chan []byte
}
func newRequest(r *http.Request) request { return request{r, make(chan []byte)} }
func (r request) process() {
time.Sleep(10 * time.Millisecond)
r.response <- []byte("√")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment