Skip to content

Instantly share code, notes, and snippets.

@adamw
Last active June 11, 2018 20:26
Show Gist options
  • Select an option

  • Save adamw/659b3b72d93755f2aa9d2ccd222c8970 to your computer and use it in GitHub Desktop.

Select an option

Save adamw/659b3b72d93755f2aa9d2ccd222c8970 to your computer and use it in GitHub Desktop.
private def runQueue(data: RateLimiterQueue[IO[Nothing, Unit]],
queue: IOQueue[RateLimiterMsg]): IO[Nothing, Unit] = {
queue
// (1) take a message from the queue (or wait until one is available)
.take
// (2) modify the data structure accordingly
.map {
case ScheduledRunQueue => data.notScheduled
case Schedule(t) => data.enqueue(t)
}
// (3) run the rate limiter queue: obtain the rate-limiter-tasks to be run
.map(_.run(System.currentTimeMillis()))
.flatMap {
case (tasks, d) =>
tasks
// (4) convert each rate-limiter-task to an IO
.map {
case Run(run) => run
case RunAfter(millis) =>
IO.sleep[Nothing](millis.millis)
.flatMap(_ => queue.offer(ScheduledRunQueue))
}
// (5) fork each converted IO so that it runs in the background
.map(_.fork[Nothing])
// (6) sequence a list of IOs which spawn background fibers
// into one big IO which, when run, will spawn all of them
.sequence_
.map(_ => d)
}
// (7) recursive call to handle the next message,
// using the updated data structure
.flatMap(d => runQueue(d, queue))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment