-
-
Save ryankennedy/1112009 to your computer and use it in GitHub Desktop.
import java.util.concurrent.{TimeUnit, SynchronousQueue} | |
/** | |
* Simple leaky bucket (http://en.wikipedia.org/wiki/Leaky_bucket) implementation using a | |
* SynchronousQueue. | |
* | |
* // This should take roughly 10 seconds to complete. | |
* val bucket = new LeakyBucket(100, TimeUnit.SECONDS) | |
* for (i <- 1 to 1000) { | |
* bucket.take() | |
* } | |
*/ | |
class LeakyBucket(rate: Long, unit: TimeUnit) { | |
// Precalculate the sleep time. | |
private val millis = unit.toMillis(1) / rate | |
private val nanos = TimeUnit.MILLISECONDS.toNanos(unit.toMillis(1) % rate) / rate | |
// The synchronous queue is used as the handoff between the leaking thread and | |
// callers waiting for a drop from the bucket. | |
private val queue = new SynchronousQueue[Int](true) | |
// Background thread to generate drops. | |
private val filler = new Thread(new Runnable() { | |
def run() { | |
while (true) { | |
Thread.sleep(millis, nanos.toInt) | |
queue.put(1) | |
} | |
} | |
}, "tokenbucket-filler") | |
filler.setDaemon(true) | |
filler.start() | |
/** | |
* Wait up to the given timeout for a drop. | |
*/ | |
def poll(timeout: Long, unit: TimeUnit) = { | |
Option(queue.poll(timeout, unit)) | |
} | |
/** | |
* Wait for a drop indefinitely. | |
*/ | |
def take() { | |
queue.take() | |
} | |
} |
Here's a leaky bucket without a queue or a thread git://gist.github.com/1291799.git
Basically, you sleep until the next free drip.
Mark, my queue has a finite size of one. See the SynchronousQueue docs (http://download.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html).
Your implementation looks more like a token bucket to me.
Ahh. Okay. Good point.
More importantly though, one of the key features of leaky bucket is that it allows for extremely light-weight implementations in routers, switches and other resource-challenged environments that might have 1,000s of independent buckets running at one time.
Using a queue and thread per bucket might have the original designers rolling in their graves when it can cost as little as a 40-byte data structure per bucket. Just sayin' :-)
A true leaky-bucket has a finite queue size. Your queue.put() should be conditional on some maximum limit.