Skip to content

Instantly share code, notes, and snippets.

@Grogs
Created December 6, 2017 00:01
Show Gist options
  • Save Grogs/e4ac942a0a8af5a6619ae93291e6e96d to your computer and use it in GitHub Desktop.
Save Grogs/e4ac942a0a8af5a6619ae93291e6e96d to your computer and use it in GitHub Desktop.
Token bucket rate limiter for Scala Futures
package me.gregd.cineworld.util
import java.util.concurrent.ConcurrentLinkedQueue
import monix.execution.Scheduler.Implicits.global
import monix.execution.atomic.AtomicInt
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._
case class RateLimiter(duration: FiniteDuration, maxInvocations: Int) {
@volatile var permits: Int = maxInvocations
val queue = new ConcurrentLinkedQueue[() => Any]()
global.scheduleAtFixedRate(duration, duration) {
this synchronized {
permits = maxInvocations
while (!queue.isEmpty && permits > 0) {
Option(queue.poll()).foreach { fun =>
permits -= 1
fun.apply()
}
}
}
}
def apply[T](f: => Future[T]): Future[T] =
this synchronized {
if (permits > 0) {
permits -= 1
f
} else {
val res = Promise[T]()
queue.add(() => { res.completeWith(f) })
res.future
}
}
}
package me.gregd.cineworld.util
import org.scalatest.{FunSuite, Matchers}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
class RateLimiterTest extends FunSuite with Matchers {
test("rate limit 100 per second") {
val start = System.currentTimeMillis()
def task(i: Int) = Future.successful(println(i + ": " + (System.currentTimeMillis() - start)))
val rateLimit = RateLimiter(100.millis, 5)
val results = for (i <- 1 to 24) yield rateLimit(task(i))
results.foreach(Await.result(_, 5.seconds))
val elapsed = System.currentTimeMillis() - start
elapsed should be > 400L
elapsed should be < 800L
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment