Last active
August 6, 2017 16:02
-
-
Save yannick-cw/cba2fe8b1e0c6962855dcb8601ca6d8b to your computer and use it in GitHub Desktop.
Monad to throttle execution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.{FlatMap, Monad} | |
import cats.data.StateT | |
import scala.concurrent.{Await, Future} | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration.Duration | |
import cats.implicits._ | |
object ThrottleExp extends App { | |
abstract class DbRepo[M[_]: Monad] { | |
def readDoc(id: String): M[String] | |
} | |
case class Throttling(elements: Int, per: Duration, tickStart: Duration, sendThisTick: Int) | |
case class ThrottledF[A, F[_]](th: StateT[F, Throttling, A]) { | |
def run(elements: Int, per: Duration)(implicit F: FlatMap[F]): F[A] = | |
th.runA(Throttling(elements, per, Duration.create(System.nanoTime, NANOSECONDS), 0)) | |
} | |
object ThrottledF { | |
def fromFuture[A](fa: Future[A])(implicit ev: Monad[Future]): ThrottledF[A, Future] = | |
ThrottledF(StateT.lift[Future, Throttling, A](fa)) | |
} | |
implicit def throttledF[F[_]: Monad]: Monad[ThrottledF[?, F]] = new Monad[ThrottledF[?, F]] { | |
override def pure[A](x: A): ThrottledF[A, F] = ThrottledF(StateT.pure(x)) | |
override def flatMap[A, B](fa: ThrottledF[A, F])(f: (A) => ThrottledF[B, F]): ThrottledF[B, F] = | |
ThrottledF(fa.th.flatMap { a => | |
for { | |
_ <- StateT.modify[F, Throttling](updateState) | |
b <- f(a).th | |
} yield b | |
}) | |
private def updateState[B, A](throttling: Throttling) = { | |
val newSendThisTick = throttling.sendThisTick + 1 | |
if (throttling.elements > newSendThisTick) throttling.copy(sendThisTick = newSendThisTick) | |
else { | |
val currentTime = Duration.create(System.nanoTime, NANOSECONDS) | |
val nextTick = throttling.tickStart + throttling.per | |
val toSleep = nextTick - currentTime | |
val timeToWait = if (toSleep.lteq(0 seconds)) 0 else toSleep.toMillis | |
Thread.sleep(timeToWait) | |
throttling.copy(tickStart = nextTick, sendThisTick = 0) | |
} | |
} | |
override def tailRecM[A, B](a: A)(f: (A) => ThrottledF[Either[A, B], F]): ThrottledF[B, F] = ??? | |
} | |
val fRepo: DbRepo[ThrottledF[?, Future]] = new DbRepo[ThrottledF[?, Future]] { | |
override def readDoc(id: String): ThrottledF[String, Future] = | |
ThrottledF.fromFuture(Future { println(s"called $id"); "test doc" }) | |
} | |
val x: ThrottledF[String, Future] = for { | |
_ <- fRepo.readDoc("1") | |
_ <- fRepo.readDoc("2") | |
_ <- fRepo.readDoc("3") | |
_ <- fRepo.readDoc("4") | |
_ <- fRepo.readDoc("5") | |
_ <- fRepo.readDoc("6") | |
doc <- fRepo.readDoc("7") | |
} yield doc | |
println(Await.result(x.run(1, 1 second), Duration.Inf)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment