Skip to content

Instantly share code, notes, and snippets.

@dustin
Last active December 16, 2015 05:19
Show Gist options
  • Save dustin/5383672 to your computer and use it in GitHub Desktop.
Save dustin/5383672 to your computer and use it in GitHub Desktop.
Example rate limited writer in go. Specify bytes per second and it will constrain you.
// This is just a creepy text effect over a writer just to demonstrate
// more io layering.
package main
import (
"io"
"math/rand"
"unicode"
"unicode/utf8"
)
type caperturber struct {
io.Writer
}
func (c *caperturber) Write(b []byte) (written int, err error) {
for _, r := range string(b) {
if rand.Intn(4) == 0 {
r = unicode.ToUpper(r)
}
buf := []byte{0, 0, 0, 0}
n := utf8.EncodeRune(buf, r)
buf = buf[:n]
w, err := c.Writer.Write(buf)
written += w
if err != nil {
break
}
}
return
}
// This is the main function, which demonstrates how you use the layers.
package main
import (
"fmt"
"os"
)
func main() {
// 10 bytes per second over stdout.
rl := NewRateLimiter(os.Stdout, 10)
defer rl.Close()
// Here we see how we can combine io.Writers. In this case,
// we've got a capitalization filter manipulating text before
// the rate limiter slows it down on its way to stdout.
w := &caperturber{rl}
// And you use it like any other io.Writer
for i := 0; i < 10; i++ {
fmt.Fprintf(w, "all work and no play makes jack a dull boy\n")
}
}
// An io.Writer to rate limit writes to other io.Writers.
package main
import (
"io"
"time"
)
// writeResponse and writeRequest represent an individual request for
// a Write operation on our rate limited writer.
type writeResponse struct {
size int
err error
}
type writeRequest struct {
bytes []byte
rv chan writeResponse
}
// This is the actual rate limited writer. Notice everything's
// private. We don't need to expose anything at all about this
// externally. It's just an io.WriteCloser as far anyone's concerned.
type rateLimiter struct {
input chan writeRequest
current []byte
currentch chan writeResponse
currentsent int
ticker *time.Ticker
limit int
remaining int
output io.Writer
quit chan bool
}
// gcd is used to compute the smallest time ticker possible for
// getting the most accurate rate limit.
func gcd(a, b int) int {
for b != 0 {
a, b = b, a%b
}
return a
}
// min keeps us from creating too small a ticker (in which case we're
// spending more time processing ticks than moving bytes)
func min(a, b int) int {
if a > b {
return b
}
return a
}
// Construct a rate limited writer that will pass the given number of
// bytes per second to the provided io.Writer.
//
// Obviously, closing this Writer will not close the underlying
// writer.
func NewRateLimiter(w io.Writer, bps int) io.WriteCloser {
unit := time.Second
g := min(1000, gcd(bps, int(unit)))
unit /= time.Duration(g)
bps /= g
rv := &rateLimiter{
input: make(chan writeRequest),
ticker: time.NewTicker(unit),
limit: bps,
remaining: bps,
output: w,
quit: make(chan bool),
}
go rv.run()
return rv
}
func (rl *rateLimiter) run() {
defer rl.ticker.Stop()
for {
// If there's input being processed, don't get
// anymore.
//
// By having the input channel be conditional, it's
// simply ignored by the select loop when there's
// already input being processed (in which case the
// select will only process ticks to write and close
// to exit).
input := rl.input
if rl.current != nil {
input = nil
}
select {
case <-rl.quit:
// There's a pending writer. Tell it we're closed.
if rl.current != nil {
rl.currentch <- writeResponse{rl.currentsent, io.EOF}
}
return
case <-rl.ticker.C:
rl.remaining = rl.limit
rl.sendSome()
case req := <-input:
rl.current = req.bytes
rl.currentch = req.rv
rl.currentsent = 0
rl.sendSome()
}
}
}
// Send as many bytes as we can in the current window.
func (rl *rateLimiter) sendSome() {
if rl.current != nil && rl.remaining > 0 {
tosend := rl.current
if rl.remaining < len(rl.current) {
tosend = rl.current[:rl.remaining]
}
rl.remaining -= len(tosend)
sent, err := rl.output.Write(tosend)
rl.currentsent += sent
rl.current = rl.current[sent:]
if len(rl.current) == 0 || err != nil {
rl.current = nil
rl.currentch <- writeResponse{rl.currentsent, err}
}
}
}
// io.Writer implementation
func (rl *rateLimiter) Write(p []byte) (n int, err error) {
req := writeRequest{p, make(chan writeResponse, 1)}
select {
case <-rl.quit:
return 0, io.EOF
case rl.input <- req:
res := <-req.rv
return res.size, res.err
}
panic("unreachable")
}
// io.Closer implementation
func (rl *rateLimiter) Close() error {
// This protects against a double sequential close, but not a
// double concurrent close. The latter will panic.
select {
case <-rl.quit:
return io.EOF
default:
}
close(rl.quit)
return nil
}
@dustin
Copy link
Author

dustin commented Apr 14, 2013

If you'd like to play interactively, see the playground version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment