-
-
Save alexandru/4e02c5ff340756e7dcc07454d1308324 to your computer and use it in GitHub Desktop.
import cats.implicits._ | |
import monix.execution.atomic.Atomic | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
class Cache[A] private (ref: Atomic[Map[String, Promise[A]]]) { | |
def cache(key: String)(task: => Future[A]) | |
(implicit ec: ExecutionContext): Future[A] = { | |
val pullFromCache = ref.transformAndExtract { current => | |
current.get(key) match { | |
case None => | |
val inst = Promise[A]() | |
(Left(inst), current.updated(key, inst)) | |
case Some(value) => | |
(Right(value), current) | |
} | |
} | |
pullFromCache match { | |
case Left(fa) => | |
// actual side-effect | |
task.attempt.flatMap { ea => | |
fa.complete(a) | |
Future.fromTry(ea.toTry) | |
} | |
case Right(fa) => | |
fa.future | |
} | |
} | |
} |
import cats.implicits._ | |
import cats.effect.Concurrent | |
import cats.effect.concurrent.{Deferred, Ref} | |
class Cache[F[_] : Concurrent, A] private (ref: Ref[F, Map[String, F[Deferred[F, Either[Throwable, A]]]]]) { | |
def cache(key: String)(task: F[A]): F[A] = { | |
val pullFromCache = ref.modify { current => | |
current.get(key) match { | |
case None => | |
val inst = Deferred.apply[F, A] | |
(current.updated(key, inst), Left(inst)) | |
case Some(value) => | |
(current, Right(value)) | |
} | |
} | |
// Given the concurrent nature of this caching, | |
// the only reasonable choice to deal with cancelation is to | |
// make it uninterruptible | |
F.uninterruptible { | |
pullFromCache.flatMap { | |
case Left(fa) => | |
for { | |
df <- fa | |
ea <- task.attempt // <- actual side-effect | |
_ <- df.complete(ea) | |
a <- F.rethrow(ea) | |
} yield a | |
case Right(fa) => | |
fa.flatMap(_.get.flatMap(F.rethrow)) | |
} | |
} | |
} | |
} |
It seems you need to handle case when task fails with Exception. In this case Deferred is never completed.
What is wrong with Coeval.evalOnce ?
https://gist.github.com/cultureofone/66810c1609fa02cb5144599476873284
@kubukoz no, but given the concurrent nature of this thing, the only solution is to make it uninterruptible
@danilbykov indeed, we need to handle exceptions, that's left as an exercise for the reader
@cultureofone sure, you can work with Coeval.evalOnce
in this case as well, but note that it has different behavior in terms of multi-threading ... more than one thread can access that instance at the same time, in which case one of them will block; this might not be a problem of course, depending on your use-case
I added error handling and made that task uninterruptible.
@alexandru As I understood from original request author (in Twitter) there is no need for multiple thread support.
@alexandru I added multi-thread version which uses Task.evalOnce - still shorter than your Promise+Future version.
Plus demo which demonstrates that it actually works in parallel:
https://gist.github.com/cultureofone/66810c1609fa02cb5144599476873284
@alexandru I see that in "pure" version Promise+Future logic is changed with Deferred logic.
Although you made things needlessly more complicated for this kind of objective, this is great example of Cats Effect usage and now I finally understand practical need for Cats (Effect) library: it is composable as... something very composable. Only question left is about debug-ability.
Thank You for that ;)
It seems to me though that the pure version cannot work the way it was written. Apart from several tweaks you need to make to have it compile, the second access for the same key will always fail as the value stored in the Map is not a Deferred
, but an F[Deferred]
that always creates a new Deferred when run. Therefore line 32 will never complete, as it tries to call .get
on a new Deferred created by the fa
effect which is not the df
completed in line 27. Confirmed this will a little test that hangs indefinitely as soon as you access any key a second time.
I created an alternative (and somewhat bigger) example for the pure cache example above with the following differences:
- Keys are optional and may be of any type
- Optionally allows for setting an expiry
- Adds a little test program that runs the sample
- To avoid the issue of the sample above it does not create new
Deferred
instances insideRef.modify
.
https://gist.github.com/jenshalm/702122b0a9ddadf9e8b1f37638817489
If you neither use the optional keys nor the optional expiry, it is essentially equivalent to Async.memoize.
Is pullFromCache.flatMap fiber safe? I imagine cancelation could hit in the meantime and you'd never complete the Deferred. Same if it hits before
df.complete
in the for comprehension.