Created
December 6, 2017 00:01
-
-
Save Grogs/e4ac942a0a8af5a6619ae93291e6e96d to your computer and use it in GitHub Desktop.
Token bucket rate limiter for Scala Futures
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.gregd.cineworld.util | |
import java.util.concurrent.ConcurrentLinkedQueue | |
import monix.execution.Scheduler.Implicits.global | |
import monix.execution.atomic.AtomicInt | |
import scala.concurrent.{Future, Promise} | |
import scala.concurrent.duration._ | |
case class RateLimiter(duration: FiniteDuration, maxInvocations: Int) { | |
@volatile var permits: Int = maxInvocations | |
val queue = new ConcurrentLinkedQueue[() => Any]() | |
global.scheduleAtFixedRate(duration, duration) { | |
this synchronized { | |
permits = maxInvocations | |
while (!queue.isEmpty && permits > 0) { | |
Option(queue.poll()).foreach { fun => | |
permits -= 1 | |
fun.apply() | |
} | |
} | |
} | |
} | |
def apply[T](f: => Future[T]): Future[T] = | |
this synchronized { | |
if (permits > 0) { | |
permits -= 1 | |
f | |
} else { | |
val res = Promise[T]() | |
queue.add(() => { res.completeWith(f) }) | |
res.future | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.gregd.cineworld.util | |
import org.scalatest.{FunSuite, Matchers} | |
import scala.concurrent.{Await, Future} | |
import scala.concurrent.duration._ | |
class RateLimiterTest extends FunSuite with Matchers { | |
test("rate limit 100 per second") { | |
val start = System.currentTimeMillis() | |
def task(i: Int) = Future.successful(println(i + ": " + (System.currentTimeMillis() - start))) | |
val rateLimit = RateLimiter(100.millis, 5) | |
val results = for (i <- 1 to 24) yield rateLimit(task(i)) | |
results.foreach(Await.result(_, 5.seconds)) | |
val elapsed = System.currentTimeMillis() - start | |
elapsed should be > 400L | |
elapsed should be < 800L | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment