Created
November 2, 2016 10:21
-
-
Save emicklei/3340aa435a894915944c92ec4ff27bec to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
) | |
// Bucket holds a limited number of tokens which can be Taken (immediate or wait blocking) and must be Returned. | |
type Bucket struct { | |
// The mutex guards the fields following it. | |
protect sync.Mutex | |
quantum int64 | |
capacity int64 | |
waiters int // number of waiting go-routines for available tokens | |
returned chan int64 | |
} | |
// NewBucket returns a Bucket initialized with cap tokens. | |
func NewBucket(cap int64) *Bucket { | |
return &Bucket{ | |
quantum: cap, | |
capacity: cap, | |
waiters: 0, | |
returned: make(chan int64), | |
} | |
} | |
func (b *Bucket) String() string { | |
return fmt.Sprintf("q=%d ,c=%d, w=%d", b.quantum, b.capacity, len(b.returned)) | |
} | |
// Take takes up to count immediately available tokens from the | |
// bucket. It returns the number of tokens removed, or zero if there are | |
// no available tokens. It does not block. | |
func (b *Bucket) Take(count int64) int64 { | |
b.protect.Lock() | |
defer b.protect.Unlock() | |
if b.quantum < count { | |
return 0 | |
} | |
if count > b.capacity { | |
b.quantum = 0 | |
return b.capacity | |
} | |
b.quantum -= count | |
return count | |
} | |
// Return returns count tokens to the bucket. You cannot return more tokens | |
// than the capacity of the bucket. It will ignore the overflow.go It does not block. | |
func (b *Bucket) Return(count int64) { | |
b.protect.Lock() | |
defer b.protect.Unlock() | |
b.quantum += count | |
// do not grow beyond capacity | |
if b.quantum > b.capacity { | |
b.quantum = b.capacity | |
} | |
// notify all waiters how many new tokens are available | |
for i := 0; i < b.waiters; i++ { | |
b.returned <- b.quantum | |
} | |
} | |
// TakeMaxDuration returns count tokens from the bucket within a timeout. | |
// Return 0 if not (enough) tokens are available within that time. | |
// It does block for at most the timeout. | |
func (b *Bucket) TakeMaxDuration(count int64, timeout time.Duration) int64 { | |
if t := b.Take(count); t == count { | |
return t | |
} | |
// This go-routine has to wait until tokens are available or the timeout | |
b.protect.Lock() | |
b.waiters++ | |
b.protect.Unlock() | |
defer func() { | |
// This go-routine is done waiting | |
b.protect.Lock() | |
b.waiters-- | |
b.protect.Unlock() | |
}() | |
deadline := time.NewTicker(timeout) | |
defer deadline.Stop() | |
for { | |
select { | |
case <-deadline.C: | |
return 0 | |
case q := <-b.returned: | |
// see if current quantum is enough for this call | |
if count <= q { | |
if t := b.Take(count); t == count { | |
return t | |
} | |
} | |
} | |
} | |
} | |
/** | |
bucket := NewBucket(1000) | |
memory := bucket.TakeMaxDuration(500, 1*time.Second) | |
if memory != 0 { | |
// do work | |
bucket.Return(memory) | |
} | |
**/ | |
func main() { | |
b := NewBucket(1000) | |
t := b.Take(100) | |
log.Printf("%v,t=%d", b, t) | |
b.Return(t) | |
log.Printf("%v,t=%d", b, t) | |
h := b.TakeMaxDuration(2000, 1*time.Second) | |
log.Printf("%v,h=%d", b, h) | |
log.Printf("%v,t=%d", b, b.Take(1000)) | |
go func() { | |
time.Sleep(50 * time.Millisecond) | |
b.Return(1000) | |
log.Println("i returned 1000") | |
}() | |
go func() { | |
log.Println("waiting for 500") | |
h2 := b.TakeMaxDuration(500, 1*time.Second) | |
log.Printf("%v,h2=%d", b, h2) | |
}() | |
log.Println("waiting for 500") | |
h3 := b.TakeMaxDuration(500, 1*time.Second) | |
log.Printf("%v,h3=%d", b, h3) | |
time.Sleep(1 * time.Second) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment