-
-
Save ryankennedy/1112009 to your computer and use it in GitHub Desktop.
| import java.util.concurrent.{TimeUnit, SynchronousQueue} | |
| /** | |
| * Simple leaky bucket (http://en.wikipedia.org/wiki/Leaky_bucket) implementation using a | |
| * SynchronousQueue. | |
| * | |
| * // This should take roughly 10 seconds to complete. | |
| * val bucket = new LeakyBucket(100, TimeUnit.SECONDS) | |
| * for (i <- 1 to 1000) { | |
| * bucket.take() | |
| * } | |
| */ | |
| class LeakyBucket(rate: Long, unit: TimeUnit) { | |
| // Precalculate the sleep time. | |
| private val millis = unit.toMillis(1) / rate | |
| private val nanos = TimeUnit.MILLISECONDS.toNanos(unit.toMillis(1) % rate) / rate | |
| // The synchronous queue is used as the handoff between the leaking thread and | |
| // callers waiting for a drop from the bucket. | |
| private val queue = new SynchronousQueue[Int](true) | |
| // Background thread to generate drops. | |
| private val filler = new Thread(new Runnable() { | |
| def run() { | |
| while (true) { | |
| Thread.sleep(millis, nanos.toInt) | |
| queue.put(1) | |
| } | |
| } | |
| }, "tokenbucket-filler") | |
| filler.setDaemon(true) | |
| filler.start() | |
| /** | |
| * Wait up to the given timeout for a drop. | |
| */ | |
| def poll(timeout: Long, unit: TimeUnit) = { | |
| Option(queue.poll(timeout, unit)) | |
| } | |
| /** | |
| * Wait for a drop indefinitely. | |
| */ | |
| def take() { | |
| queue.take() | |
| } | |
| } |
Here's a leaky bucket without a queue or a thread git://gist.github.com/1291799.git
Basically, you sleep until the next free drip.
Mark, my queue has a finite size of one. See the SynchronousQueue docs (http://download.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html).
Your implementation looks more like a token bucket to me.
Ahh. Okay. Good point.
More importantly though, one of the key features of leaky bucket is that it allows for extremely light-weight implementations in routers, switches and other resource-challenged environments that might have 1,000s of independent buckets running at one time.
Using a queue and thread per bucket might have the original designers rolling in their graves when it can cost as little as a 40-byte data structure per bucket. Just sayin' :-)
A true leaky-bucket has a finite queue size. Your queue.put() should be conditional on some maximum limit.