This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Queue of rate-limited computations. The computations will be *started* so that at | |
| * any time, there's at most `maxRuns` in any time `perMillis` window. | |
| * | |
| * Note that this does not take into account the duration of the computations, when | |
| * they end or when they reach a remote server. | |
| * | |
| * @param scheduled Is an invocation of `run` already scheduled (by returning an | |
| * appropriate task in the previous invocation): used to prevent | |
| * scheduling too much runs; it's enough if there's only one run |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private class RateLimiterActor(maxRuns: Int, per: FiniteDuration) extends Actor | |
| with ActorLogging { | |
| import context.dispatcher | |
| // mutable actor state: the current rate limiter queue; the queue itself is | |
| // immutable, but the reference is mutable and access to it is protected by | |
| // the actor. | |
| private var queue = RateLimiterQueue[LazyFuture](maxRuns, per.toMillis) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class AkkaRateLimiter(rateLimiterActor: ActorRef) { | |
| def runLimited[T](f: => Future[T])(implicit ec: ExecutionContext): Future[T] = { | |
| val p = Promise[T] | |
| val msg = LazyFuture(() => f.andThen { case r => p.complete(r) }.map(_ => ())) | |
| rateLimiterActor ! msg | |
| p.future | |
| } | |
| } | |
| object AkkaRateLimiter { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| sealed trait RateLimiterMsg | |
| case class LazyFuture(t: () => Future[Unit]) extends RateLimiterMsg | |
| case object ScheduledRunQueue extends RateLimiterMsg |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def rateLimit(timer: TimerScheduler[RateLimiterMsg], | |
| data: RateLimiterQueue[LazyFuture]): Behavior[RateLimiterMsg] = | |
| Behaviors.receiveMessage { | |
| case lf: LazyFuture => rateLimit(timer, runQueue(timer, data.enqueue(lf))) | |
| case ScheduledRunQueue => rateLimit(timer, runQueue(timer, data.notScheduled)) | |
| } | |
| def runQueue(timer: TimerScheduler[RateLimiterMsg], | |
| data: RateLimiterQueue[LazyFuture]): RateLimiterQueue[LazyFuture] = { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class AkkaTypedRateLimiter(actorSystem: ActorSystem[RateLimiterMsg]) | |
| extends StrictLogging { | |
| def runLimited[T](f: => Future[T])(implicit ec: ExecutionContext): Future[T] = { | |
| val p = Promise[T] | |
| actorSystem ! LazyFuture(() => f.andThen { case r => p.complete(r) }.map(_ => ())) | |
| p.future | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| sealed trait RateLimiterMsg | |
| case object ScheduledRunQueue extends RateLimiterMsg | |
| case class Schedule(t: Task[Unit]) extends RateLimiterMsg |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private def runQueue(data: RateLimiterQueue[Task[Unit]], | |
| queue: MVar[RateLimiterMsg]): Task[Unit] = { | |
| queue | |
| // (1) take a message from the queue (or wait until one is available) | |
| .take | |
| // (2) modify the data structure accordingly | |
| .map { | |
| case ScheduledRunQueue => data.notScheduled | |
| case Schedule(t) => data.enqueue(t) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class MonixRateLimiter(queue: MVar[RateLimiterMsg], queueFiber: Fiber[Task, Unit]) { | |
| def runLimited[T](f: Task[T]): Task[T] = { | |
| for { | |
| mv <- MVar.empty[T] | |
| _ <- queue.put(Schedule(f.flatMap(mv.put))) | |
| r <- mv.take | |
| } yield r | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private def runQueue(data: RateLimiterQueue[IO[Nothing, Unit]], | |
| queue: IOQueue[RateLimiterMsg]): IO[Nothing, Unit] = { | |
| queue | |
| // (1) take a message from the queue (or wait until one is available) | |
| .take | |
| // (2) modify the data structure accordingly | |
| .map { | |
| case ScheduledRunQueue => data.notScheduled | |
| case Schedule(t) => data.enqueue(t) | |
| } |