Skip to content

Instantly share code, notes, and snippets.

@adamw
Last active June 7, 2018 11:03
Show Gist options
  • Select an option

  • Save adamw/598f0ca17a26a1110723be22442e99ae to your computer and use it in GitHub Desktop.

Select an option

Save adamw/598f0ca17a26a1110723be22442e99ae to your computer and use it in GitHub Desktop.
private def runQueue(data: RateLimiterQueue[Task[Unit]],
queue: MVar[RateLimiterMsg]): Task[Unit] = {
queue
// (1) take a message from the queue (or wait until one is available)
.take
// (2) modify the data structure accordingly
.map {
case ScheduledRunQueue => data.notScheduled
case Schedule(t) => data.enqueue(t)
}
// (3) run the rate limiter queue: obtain the rate-limiter-tasks to be run
.map(_.run(System.currentTimeMillis()))
.flatMap {
case (tasks, d) =>
tasks
// (4) convert each rate-limiter-task to a Monix-Task
.map {
case Run(run) => run
case RunAfter(millis) =>
Task.sleep(millis.millis)
.flatMap(_ => queue.put(ScheduledRunQueue))
}
// (5) fork each converted Monix-Task so that it runs in the background
.map(_.fork)
// (6) sequence a list of tasks which spawn background fibers
// into one big task which, when run, will spawn all of them
.sequence_
.map(_ => d)
}
// (7) recursive call to handle the next message,
// using the updated data structure
.flatMap(d => runQueue(d, queue))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment