Skip to content

Instantly share code, notes, and snippets.

@ChristopherDavenport
Last active September 17, 2021 23:15
Show Gist options
  • Save ChristopherDavenport/32b03c2533daad59e637b4fb7dbbebbc to your computer and use it in GitHub Desktop.
Save ChristopherDavenport/32b03c2533daad59e637b4fb7dbbebbc to your computer and use it in GitHub Desktop.
Single Fibered Computation
import cats._
import cats.syntax.all._
import cats.data.Kleisli
import cats.effect._
import cats.effect.syntax._
import cats.effect.kernel.Outcome.Canceled
import cats.effect.kernel.Outcome.Errored
import cats.effect.kernel.Outcome.Succeeded
import java.util.concurrent.CancellationException
object SingleFibered {
def prepare[F[_]: Concurrent, K, V](f: K => F[V]): F[K => F[V]] = {
val keypool: F[K => Ref[F, Option[F[Outcome[F, Throwable, V]]]]] = ???
keypool.map{ r =>
singleFibered[F, K, V](r, f)
}
}
// state: MapRef - but I could subsume that for something integrated to cats-effect
// F Action
def singleFibered[F[_]: Concurrent, K, V](
state: K => Ref[F, Option[F[Outcome[F, Throwable, V]]]],
f: K => F[V]
) = {
{(k: K) =>
Deferred[F, Outcome[F, Throwable, V]].flatMap{d =>
Concurrent[F].uncancelable{poll =>
state(k)
.modify{
case s@Some(out) => s ->
poll(out)
.flatMap(embedError(_))
case None =>
Some(d.get) ->
Concurrent[F].guaranteeCase(poll(f(k))){
o => state(k).set(None) >> d.complete(o).void
}
}.flatten
}
}
}
}
def embedError[F[_]: ApplicativeThrow, V](outcome: Outcome[F, Throwable, V]): F[V] = outcome match {
case Canceled() => new CancellationException("Canceled via").raiseError[F, V]
case Errored(e) => e.raiseError[F, V]
case Succeeded(fa) => fa
}
trait Cache[F[_], K, V]{
def lookup(k: K): F[Option[V]]
def set(k: K, v: V): F[Unit]
def delete(k: K): F[V]
}
// Could prepare full op, but more efficient to do cache lookup without the fiber suspension
def cachedOr[F[_]: Concurrent, K, V](cache: Cache[F, K, V], f: K => F[V]): F[K => F[V]] = {
prepare(f).map(long =>
{(k: K) => cats.data.OptionT(cache.lookup(k)).getOrElseF(long(k).flatTap(cache.set(k, _))) }
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment