Last active
September 14, 2022 02:23
-
-
Save alexandru/623fe6c587d73e89a8f14de284ca1e2d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import monix.eval.Task | |
import java.util.concurrent.TimeUnit | |
import scala.concurrent.duration._ | |
/** Request limiter for APIs that have quotas per second, minute, hour, etc. | |
* | |
* {{{ | |
* // Rate-limits to 100 requests per second | |
* val limiter = TaskLimiter(TimeUnit.SECONDS, limit = 100) | |
* | |
* limiter.request(myTask) | |
* }}} | |
*/ | |
final class TaskLimiter(period: TimeUnit, limit: Int) { | |
import monix.execution.atomic.Atomic | |
import TaskLimiter.State | |
private[this] val state = | |
Atomic(State(0, period, 0, limit)) | |
def request[A](task: Task[A]): Task[A] = | |
Task.deferAction { ec => | |
val now = ec.currentTimeMillis() | |
state.transformAndExtract(_.request(now)) match { | |
case None => task | |
case Some(delay) => | |
// Recursive call, retrying request after delay | |
request(task).delayExecution(delay) | |
} | |
} | |
} | |
object TaskLimiter { | |
/** Builder for [[TaskLimiter]]. */ | |
def apply(period: TimeUnit, limit: Int): TaskLimiter = | |
new TaskLimiter(period, limit) | |
/** Timestamp specified in milliseconds since epoch, | |
* as returned by `System.currentTimeMillis` | |
*/ | |
type Timestamp = Long | |
/** Internal state of [[TaskLimiter]]. */ | |
final case class State(window: Long, period: TimeUnit, requested: Int, limit: Int) { | |
private def periodMillis = | |
TimeUnit.MILLISECONDS.convert(1, period) | |
def request(now: Timestamp): (Option[FiniteDuration], State) = { | |
val periodMillis = this.periodMillis | |
val currentWindow = now / periodMillis | |
if (currentWindow != window) | |
(None, copy(window = currentWindow, requested = 1)) | |
else if (requested < limit) | |
(None, copy(requested = requested + 1)) | |
else { | |
val nextTS = (currentWindow + 1) * periodMillis | |
val sleep = nextTS - now | |
(Some(sleep.millis), this) | |
} | |
} | |
} | |
} |
Hi! thanks a lot for this. I would love to get an example on how to use it. New to scala, but this is exactly what im looking for ! (launch a function every couple of secs)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you, I'm learning monix. This is very nice example. I'd love to see more.