Last active
June 6, 2018 10:30
-
-
Save adamw/e138fe655b9eb8b2401126cc0cd7810c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Queue of rate-limited computations. The computations will be *started* so that at | |
| * any time, there's at most `maxRuns` in any time `perMillis` window. | |
| * | |
| * Note that this does not take into account the duration of the computations, when | |
| * they end or when they reach a remote server. | |
| * | |
| * @param scheduled Is an invocation of `run` already scheduled (by returning an | |
| * appropriate task in the previous invocation): used to prevent | |
| * scheduling too much runs; it's enough if there's only one run | |
| * scheduled at any given time. | |
| * @tparam F Type of computations. Should be a lazy wrapper, so that computations can | |
| * be enqueued for later execution. | |
| */ | |
| case class RateLimiterQueue[F](maxRuns: Int, perMillis: Long, | |
| lastTimestamps: Queue[Long], | |
| waiting: Queue[F], scheduled: Boolean) { | |
| /** | |
| * Given the timestamp, obtain a list of task which might include running a | |
| * computation or scheduling a `run` invocation in the future, and an updated | |
| * queue. | |
| */ | |
| def run(now: Long): (List[RateLimiterTask[F]], RateLimiterQueue[F]) = { | |
| pruneTimestamps(now).doRun(now) | |
| } | |
| /** | |
| * Add a request to the queue. Doesn't run any pending requests. | |
| */ | |
| def enqueue(f: F): RateLimiterQueue[F] = copy(waiting = waiting.enqueue(f)) | |
| /** | |
| * Before invoking a scheduled `run`, clear the scheduled flag. | |
| * If needed, the next `run` invocation might include a `RunAfter` task. | |
| */ | |
| def notScheduled: RateLimiterQueue[F] = copy(scheduled = false) | |
| private def doRun(now: Long): (List[RateLimiterTask[F]], RateLimiterQueue[F]) = { | |
| if (lastTimestamps.size < maxRuns) { | |
| waiting.dequeueOption match { | |
| case Some((io, w)) => | |
| val (tasks, next) = | |
| copy(lastTimestamps = lastTimestamps.enqueue(now), waiting = w).run(now) | |
| (Run(io) :: tasks, next) | |
| case None => | |
| (Nil, this) | |
| } | |
| } else if (!scheduled) { | |
| val nextAvailableSlot = perMillis - (now - lastTimestamps.head) | |
| (List(RunAfter(nextAvailableSlot)), this.copy(scheduled = true)) | |
| } else { | |
| (Nil, this) | |
| } | |
| } | |
| /** | |
| * Remove timestamps which are outside of the current time window, that is | |
| * timestamps which are further from `now` than `timeMillis`. | |
| */ | |
| private def pruneTimestamps(now: Long): RateLimiterQueue[F] = { | |
| val threshold = now - perMillis | |
| copy(lastTimestamps = lastTimestamps.filter(_ >= threshold)) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment