Last active
September 17, 2021 23:15
-
-
Save ChristopherDavenport/32b03c2533daad59e637b4fb7dbbebbc to your computer and use it in GitHub Desktop.
Single Fibered Computation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats._ | |
import cats.syntax.all._ | |
import cats.data.Kleisli | |
import cats.effect._ | |
import cats.effect.syntax._ | |
import cats.effect.kernel.Outcome.Canceled | |
import cats.effect.kernel.Outcome.Errored | |
import cats.effect.kernel.Outcome.Succeeded | |
import java.util.concurrent.CancellationException | |
object SingleFibered { | |
def prepare[F[_]: Concurrent, K, V](f: K => F[V]): F[K => F[V]] = { | |
val keypool: F[K => Ref[F, Option[F[Outcome[F, Throwable, V]]]]] = ??? | |
keypool.map{ r => | |
singleFibered[F, K, V](r, f) | |
} | |
} | |
// state: MapRef - but I could subsume that for something integrated to cats-effect | |
// F Action | |
def singleFibered[F[_]: Concurrent, K, V]( | |
state: K => Ref[F, Option[F[Outcome[F, Throwable, V]]]], | |
f: K => F[V] | |
) = { | |
{(k: K) => | |
Deferred[F, Outcome[F, Throwable, V]].flatMap{d => | |
Concurrent[F].uncancelable{poll => | |
state(k) | |
.modify{ | |
case s@Some(out) => s -> | |
poll(out) | |
.flatMap(embedError(_)) | |
case None => | |
Some(d.get) -> | |
Concurrent[F].guaranteeCase(poll(f(k))){ | |
o => state(k).set(None) >> d.complete(o).void | |
} | |
}.flatten | |
} | |
} | |
} | |
} | |
def embedError[F[_]: ApplicativeThrow, V](outcome: Outcome[F, Throwable, V]): F[V] = outcome match { | |
case Canceled() => new CancellationException("Canceled via").raiseError[F, V] | |
case Errored(e) => e.raiseError[F, V] | |
case Succeeded(fa) => fa | |
} | |
trait Cache[F[_], K, V]{ | |
def lookup(k: K): F[Option[V]] | |
def set(k: K, v: V): F[Unit] | |
def delete(k: K): F[V] | |
} | |
// Could prepare full op, but more efficient to do cache lookup without the fiber suspension | |
def cachedOr[F[_]: Concurrent, K, V](cache: Cache[F, K, V], f: K => F[V]): F[K => F[V]] = { | |
prepare(f).map(long => | |
{(k: K) => cats.data.OptionT(cache.lookup(k)).getOrElseF(long(k).flatTap(cache.set(k, _))) } | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment