Skip to content

Instantly share code, notes, and snippets.

View adamw's full-sized avatar

Adam Warski adamw

View GitHub Profile
/**
* Queue of rate-limited computations. The computations will be *started* so that at
* any time, there's at most `maxRuns` in any time `perMillis` window.
*
* Note that this does not take into account the duration of the computations, when
* they end or when they reach a remote server.
*
* @param scheduled Is an invocation of `run` already scheduled (by returning an
* appropriate task in the previous invocation): used to prevent
* scheduling too much runs; it's enough if there's only one run
private class RateLimiterActor(maxRuns: Int, per: FiniteDuration) extends Actor
with ActorLogging {
import context.dispatcher
// mutable actor state: the current rate limiter queue; the queue itself is
// immutable, but the reference is mutable and access to it is protected by
// the actor.
private var queue = RateLimiterQueue[LazyFuture](maxRuns, per.toMillis)
class AkkaRateLimiter(rateLimiterActor: ActorRef) {
def runLimited[T](f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val p = Promise[T]
val msg = LazyFuture(() => f.andThen { case r => p.complete(r) }.map(_ => ()))
rateLimiterActor ! msg
p.future
}
}
object AkkaRateLimiter {
sealed trait RateLimiterMsg
case class LazyFuture(t: () => Future[Unit]) extends RateLimiterMsg
case object ScheduledRunQueue extends RateLimiterMsg
def rateLimit(timer: TimerScheduler[RateLimiterMsg],
data: RateLimiterQueue[LazyFuture]): Behavior[RateLimiterMsg] =
Behaviors.receiveMessage {
case lf: LazyFuture => rateLimit(timer, runQueue(timer, data.enqueue(lf)))
case ScheduledRunQueue => rateLimit(timer, runQueue(timer, data.notScheduled))
}
def runQueue(timer: TimerScheduler[RateLimiterMsg],
data: RateLimiterQueue[LazyFuture]): RateLimiterQueue[LazyFuture] = {
class AkkaTypedRateLimiter(actorSystem: ActorSystem[RateLimiterMsg])
extends StrictLogging {
def runLimited[T](f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val p = Promise[T]
actorSystem ! LazyFuture(() => f.andThen { case r => p.complete(r) }.map(_ => ()))
p.future
}
}
sealed trait RateLimiterMsg
case object ScheduledRunQueue extends RateLimiterMsg
case class Schedule(t: Task[Unit]) extends RateLimiterMsg
private def runQueue(data: RateLimiterQueue[Task[Unit]],
queue: MVar[RateLimiterMsg]): Task[Unit] = {
queue
// (1) take a message from the queue (or wait until one is available)
.take
// (2) modify the data structure accordingly
.map {
case ScheduledRunQueue => data.notScheduled
case Schedule(t) => data.enqueue(t)
}
class MonixRateLimiter(queue: MVar[RateLimiterMsg], queueFiber: Fiber[Task, Unit]) {
def runLimited[T](f: Task[T]): Task[T] = {
for {
mv <- MVar.empty[T]
_ <- queue.put(Schedule(f.flatMap(mv.put)))
r <- mv.take
} yield r
}
}
private def runQueue(data: RateLimiterQueue[IO[Nothing, Unit]],
queue: IOQueue[RateLimiterMsg]): IO[Nothing, Unit] = {
queue
// (1) take a message from the queue (or wait until one is available)
.take
// (2) modify the data structure accordingly
.map {
case ScheduledRunQueue => data.notScheduled
case Schedule(t) => data.enqueue(t)
}