Last active
December 16, 2015 05:19
-
-
Save dustin/5383672 to your computer and use it in GitHub Desktop.
Example rate limited writer in go. Specify bytes per second and it will constrain you.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is just a creepy text effect over a writer just to demonstrate | |
// more io layering. | |
package main | |
import ( | |
"io" | |
"math/rand" | |
"unicode" | |
"unicode/utf8" | |
) | |
type caperturber struct { | |
io.Writer | |
} | |
func (c *caperturber) Write(b []byte) (written int, err error) { | |
for _, r := range string(b) { | |
if rand.Intn(4) == 0 { | |
r = unicode.ToUpper(r) | |
} | |
buf := []byte{0, 0, 0, 0} | |
n := utf8.EncodeRune(buf, r) | |
buf = buf[:n] | |
w, err := c.Writer.Write(buf) | |
written += w | |
if err != nil { | |
break | |
} | |
} | |
return | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is the main function, which demonstrates how you use the layers. | |
package main | |
import ( | |
"fmt" | |
"os" | |
) | |
func main() { | |
// 10 bytes per second over stdout. | |
rl := NewRateLimiter(os.Stdout, 10) | |
defer rl.Close() | |
// Here we see how we can combine io.Writers. In this case, | |
// we've got a capitalization filter manipulating text before | |
// the rate limiter slows it down on its way to stdout. | |
w := &caperturber{rl} | |
// And you use it like any other io.Writer | |
for i := 0; i < 10; i++ { | |
fmt.Fprintf(w, "all work and no play makes jack a dull boy\n") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// An io.Writer to rate limit writes to other io.Writers. | |
package main | |
import ( | |
"io" | |
"time" | |
) | |
// writeResponse and writeRequest represent an individual request for | |
// a Write operation on our rate limited writer. | |
type writeResponse struct { | |
size int | |
err error | |
} | |
type writeRequest struct { | |
bytes []byte | |
rv chan writeResponse | |
} | |
// This is the actual rate limited writer. Notice everything's | |
// private. We don't need to expose anything at all about this | |
// externally. It's just an io.WriteCloser as far anyone's concerned. | |
type rateLimiter struct { | |
input chan writeRequest | |
current []byte | |
currentch chan writeResponse | |
currentsent int | |
ticker *time.Ticker | |
limit int | |
remaining int | |
output io.Writer | |
quit chan bool | |
} | |
// gcd is used to compute the smallest time ticker possible for | |
// getting the most accurate rate limit. | |
func gcd(a, b int) int { | |
for b != 0 { | |
a, b = b, a%b | |
} | |
return a | |
} | |
// min keeps us from creating too small a ticker (in which case we're | |
// spending more time processing ticks than moving bytes) | |
func min(a, b int) int { | |
if a > b { | |
return b | |
} | |
return a | |
} | |
// Construct a rate limited writer that will pass the given number of | |
// bytes per second to the provided io.Writer. | |
// | |
// Obviously, closing this Writer will not close the underlying | |
// writer. | |
func NewRateLimiter(w io.Writer, bps int) io.WriteCloser { | |
unit := time.Second | |
g := min(1000, gcd(bps, int(unit))) | |
unit /= time.Duration(g) | |
bps /= g | |
rv := &rateLimiter{ | |
input: make(chan writeRequest), | |
ticker: time.NewTicker(unit), | |
limit: bps, | |
remaining: bps, | |
output: w, | |
quit: make(chan bool), | |
} | |
go rv.run() | |
return rv | |
} | |
func (rl *rateLimiter) run() { | |
defer rl.ticker.Stop() | |
for { | |
// If there's input being processed, don't get | |
// anymore. | |
// | |
// By having the input channel be conditional, it's | |
// simply ignored by the select loop when there's | |
// already input being processed (in which case the | |
// select will only process ticks to write and close | |
// to exit). | |
input := rl.input | |
if rl.current != nil { | |
input = nil | |
} | |
select { | |
case <-rl.quit: | |
// There's a pending writer. Tell it we're closed. | |
if rl.current != nil { | |
rl.currentch <- writeResponse{rl.currentsent, io.EOF} | |
} | |
return | |
case <-rl.ticker.C: | |
rl.remaining = rl.limit | |
rl.sendSome() | |
case req := <-input: | |
rl.current = req.bytes | |
rl.currentch = req.rv | |
rl.currentsent = 0 | |
rl.sendSome() | |
} | |
} | |
} | |
// Send as many bytes as we can in the current window. | |
func (rl *rateLimiter) sendSome() { | |
if rl.current != nil && rl.remaining > 0 { | |
tosend := rl.current | |
if rl.remaining < len(rl.current) { | |
tosend = rl.current[:rl.remaining] | |
} | |
rl.remaining -= len(tosend) | |
sent, err := rl.output.Write(tosend) | |
rl.currentsent += sent | |
rl.current = rl.current[sent:] | |
if len(rl.current) == 0 || err != nil { | |
rl.current = nil | |
rl.currentch <- writeResponse{rl.currentsent, err} | |
} | |
} | |
} | |
// io.Writer implementation | |
func (rl *rateLimiter) Write(p []byte) (n int, err error) { | |
req := writeRequest{p, make(chan writeResponse, 1)} | |
select { | |
case <-rl.quit: | |
return 0, io.EOF | |
case rl.input <- req: | |
res := <-req.rv | |
return res.size, res.err | |
} | |
panic("unreachable") | |
} | |
// io.Closer implementation | |
func (rl *rateLimiter) Close() error { | |
// This protects against a double sequential close, but not a | |
// double concurrent close. The latter will panic. | |
select { | |
case <-rl.quit: | |
return io.EOF | |
default: | |
} | |
close(rl.quit) | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If you'd like to play interactively, see the playground version.