Skip to content

Instantly share code, notes, and snippets.

@yannick-cw
Last active August 6, 2017 16:02
Show Gist options
  • Save yannick-cw/cba2fe8b1e0c6962855dcb8601ca6d8b to your computer and use it in GitHub Desktop.
Save yannick-cw/cba2fe8b1e0c6962855dcb8601ca6d8b to your computer and use it in GitHub Desktop.
Monad to throttle execution
import cats.{FlatMap, Monad}
import cats.data.StateT
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import cats.implicits._
object ThrottleExp extends App {
abstract class DbRepo[M[_]: Monad] {
def readDoc(id: String): M[String]
}
case class Throttling(elements: Int, per: Duration, tickStart: Duration, sendThisTick: Int)
case class ThrottledF[A, F[_]](th: StateT[F, Throttling, A]) {
def run(elements: Int, per: Duration)(implicit F: FlatMap[F]): F[A] =
th.runA(Throttling(elements, per, Duration.create(System.nanoTime, NANOSECONDS), 0))
}
object ThrottledF {
def fromFuture[A](fa: Future[A])(implicit ev: Monad[Future]): ThrottledF[A, Future] =
ThrottledF(StateT.lift[Future, Throttling, A](fa))
}
implicit def throttledF[F[_]: Monad]: Monad[ThrottledF[?, F]] = new Monad[ThrottledF[?, F]] {
override def pure[A](x: A): ThrottledF[A, F] = ThrottledF(StateT.pure(x))
override def flatMap[A, B](fa: ThrottledF[A, F])(f: (A) => ThrottledF[B, F]): ThrottledF[B, F] =
ThrottledF(fa.th.flatMap { a =>
for {
_ <- StateT.modify[F, Throttling](updateState)
b <- f(a).th
} yield b
})
private def updateState[B, A](throttling: Throttling) = {
val newSendThisTick = throttling.sendThisTick + 1
if (throttling.elements > newSendThisTick) throttling.copy(sendThisTick = newSendThisTick)
else {
val currentTime = Duration.create(System.nanoTime, NANOSECONDS)
val nextTick = throttling.tickStart + throttling.per
val toSleep = nextTick - currentTime
val timeToWait = if (toSleep.lteq(0 seconds)) 0 else toSleep.toMillis
Thread.sleep(timeToWait)
throttling.copy(tickStart = nextTick, sendThisTick = 0)
}
}
override def tailRecM[A, B](a: A)(f: (A) => ThrottledF[Either[A, B], F]): ThrottledF[B, F] = ???
}
val fRepo: DbRepo[ThrottledF[?, Future]] = new DbRepo[ThrottledF[?, Future]] {
override def readDoc(id: String): ThrottledF[String, Future] =
ThrottledF.fromFuture(Future { println(s"called $id"); "test doc" })
}
val x: ThrottledF[String, Future] = for {
_ <- fRepo.readDoc("1")
_ <- fRepo.readDoc("2")
_ <- fRepo.readDoc("3")
_ <- fRepo.readDoc("4")
_ <- fRepo.readDoc("5")
_ <- fRepo.readDoc("6")
doc <- fRepo.readDoc("7")
} yield doc
println(Await.result(x.run(1, 1 second), Duration.Inf))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment