Skip to content

Instantly share code, notes, and snippets.

@timvw
Created November 16, 2021 17:02
Show Gist options
  • Select an option

  • Save timvw/2ac6f6d8fd19dfa8dfd827fd714eb82c to your computer and use it in GitHub Desktop.

Select an option

Save timvw/2ac6f6d8fd19dfa8dfd827fd714eb82c to your computer and use it in GitHub Desktop.
Ratelimit with cats-effect
package be.icteam.sample
import cats.effect._
import cats.implicits._
import cats.effect.concurrent.Semaphore
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
object Main extends IOApp {
val es = Executors.newFixedThreadPool(5)
val ec = ExecutionContext.fromExecutor(es)
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
override def contextShift: ContextShift[IO] = cs
val rnd = new scala.util.Random()
def work(x: Int): IO[Int] = IO {
val cost = rnd.nextInt(4000)
println(s"$x sleeping for ${cost}ms.")
Thread.sleep(cost)
println(s"$x done")
x
}
def rateLimited[A, B](limit: Int)(fn: A => IO[B])(items: List[A]): IO[List[B]] = {
for {
s <- Semaphore[IO](limit)
results <- items.parTraverse(x => s.withPermit(fn(x)))
} yield results
}
override def run(args: List[String]): IO[ExitCode] = {
val items: List[Int] = Seq.range(1, 10).toList
val resultsIO: IO[List[Int]] = rateLimited(20)(work)(items)
println(resultsIO.unsafeRunSync())
IO(ExitCode.Success)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment