Created
September 28, 2021 10:04
-
-
Save kryptt/e20e861df414a643503112b408abde38 to your computer and use it in GitHub Desktop.
Caching pipes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kobo | |
package assist | |
import scalacache.{Cache, Mode} | |
import fs2.{INothing, Pipe, Stream} | |
import fs2.concurrent._ | |
import cats.syntax.all._ | |
import cats.data.{EitherT, OptionT} | |
import scala.concurrent.duration._ | |
import cats.effect.{Concurrent, ExitCase} | |
final case class EmptyFallbackCache(keyParts: Seq[Any]) | |
extends RuntimeException(s"Fallback cache was empty for $keyParts") | |
object Pipes { | |
/** Dual caching strategy where the longTermCache is used in case of errors */ | |
def backingCache[F[_]: Mode: Concurrent, T](keyParts: Any*)( | |
shortTermCache: Cache[Vector[T]], | |
shortTermDuration: Option[Duration], | |
fallbackCache: Cache[Vector[T]], | |
fallbackDuration: Option[Duration], | |
): Pipe[F, T, T] = { | |
val cacheKey = keyParts :+ "shortTerm" | |
val fallbackKey = keyParts :+ "longTerm" | |
fetch => | |
Stream.force( | |
OptionT(shortTermCache.get(cacheKey: _*)).cataF( | |
Queue.noneTerminated[F, T].map { queue => | |
broadcastPut[F, T]( | |
fetch, | |
queue, | |
put(cacheKey: _*)(shortTermCache, shortTermDuration), | |
put(fallbackKey: _*)(fallbackCache, fallbackDuration), | |
) | |
.handleErrorWith(_ => | |
Stream.evalSeq( | |
EitherT | |
.fromOptionF( | |
fallbackCache.get(fallbackKey: _*), | |
EmptyFallbackCache(fallbackKey), | |
) | |
.rethrowT | |
) | |
) | |
}, | |
Stream.emits(_).covary[F].pure, | |
) | |
) | |
} | |
/** Similar to Cache.cachingF, except the stream output uninterrupted. This can lead to competing | |
* writes and racing between values. | |
*/ | |
def caching[F[_]: Mode: Concurrent, T]( | |
keyParts: Any* | |
)(cache: Cache[Vector[T]], duration: Option[Duration]): Pipe[F, T, T] = | |
fetch => | |
Stream.force( | |
OptionT(cache.get(keyParts: _*)).cataF( | |
Queue | |
.noneTerminated[F, T] | |
.map(broadcastPut(fetch, _, put(keyParts: _*)(cache, duration))), | |
Stream.emits(_).covary[F].pure, | |
) | |
) | |
def broadcastPut[F[_]: Concurrent, T]( | |
fetch: Stream[F, T], | |
queue: NoneTerminatedQueue[F, T], | |
pipes: Pipe[F, T, T]* | |
): Stream[F, T] = | |
fetch | |
.evalTap(t => queue.enqueue1(Some(t))) | |
.onFinalize(queue.enqueue1(None)) | |
.onFinalizeCase { | |
case ExitCase.Completed if pipes.size === 1 => | |
queue.dequeue.through(pipes(0)).compile.drain | |
case ExitCase.Completed => | |
queue.dequeue.broadcastThrough(pipes: _*).compile.drain | |
case _ => Concurrent[F].unit | |
} | |
/** Pipe all output to the supplied cache as a Vector[T] */ | |
def put[F[_]: Mode: Concurrent, T]( | |
keyParts: Any* | |
)(cache: Cache[Vector[T]], duration: Option[Duration]): Pipe[F, T, INothing] = | |
s => | |
Stream.force( | |
s.compile | |
.toVector | |
.flatMap(cache.put(keyParts: _*)(_, duration)) | |
.as(Stream.empty[F]) | |
) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kobo | |
package assist | |
import org.specs2.Specification | |
import cats.effect.testing.specs2.CatsIO | |
import scalacache.caffeine.CaffeineCache | |
import scalacache.CatsEffect.modes._ | |
import cats.syntax.all._ | |
import cats.effect.IO | |
import fs2.Stream | |
class PipesSpeq extends Specification with CatsIO { | |
override def is = | |
s2""" | |
puts to cache $putsCache | |
caching values $caching | |
caching skips calls $cachingSkips | |
caching does not store partial $cachingImpartial | |
backingCache skips calls $backingSkips | |
backingCache uses fallback $backingUsesFallback | |
backingCache sends intermediate output $backingEmits | |
backingCache writes to both caches on sucess $backingWritesBoth | |
""" | |
val cache = CaffeineCache[Vector[Int]] | |
def putsCache = | |
Stream(1, 2, 3) | |
.covary[IO] | |
.through(Pipes.put("putsTest")(cache, None)) | |
.compile | |
.drain | |
.productR(cache.get[IO]("putsTest")) | |
.map(_ must beSome(beEqualTo(Vector(1, 2, 3)))) | |
def caching = | |
Stream(6, 7) | |
.covary[IO] | |
.through(Pipes.caching("cachingTest")(cache, None)) | |
.compile | |
.toVector | |
.product(cache.get[IO]("cachingTest")) | |
.map { case (s, c) => c must beSome(beEqualTo(s)) } | |
def cachingSkips = | |
cache | |
.put[IO]("cachingSkips")(Vector(1, 2), None) | |
.productR( | |
Stream | |
.raiseError[IO](new RuntimeException()) | |
.through(Pipes.caching("cachingSkips")(cache, None)) | |
.compile | |
.toVector | |
) | |
.map(_ must beEqualTo(Vector(1, 2))) | |
def cachingImpartial = | |
Stream(1, 2, 3) | |
.append(Stream.raiseError[IO](new RuntimeException())) | |
.through(Pipes.caching("cachingImpartial")(cache, None)) | |
.compile | |
.toVector | |
.attempt | |
.product(cache.get[IO]("cachingImpartial")) | |
.map { case (s, c) => (s must beLeft).and(c must beNone) } | |
def backingSkips = | |
cache | |
.put[IO]("backingSkips", "shortTerm")(Vector(1, 2, 4), None) | |
.productR( | |
Stream | |
.raiseError[IO](new RuntimeException()) | |
.through(Pipes.backingCache("backingSkips")(cache, None, cache, None)) | |
.compile | |
.toVector | |
) | |
.map(_ must beEqualTo(Vector(1, 2, 4))) | |
def backingWritesBoth = | |
Stream(1, 3) | |
.covary[IO] | |
.through(Pipes.backingCache("backingWritesBoth")(cache, None, cache, None)) | |
.compile | |
.toVector | |
.product(cache.get[IO]("backingWritesBoth", "shortTerm")) | |
.product(cache.get[IO]("backingWritesBoth", "longTerm")) | |
.map { | |
case ((s, cS), cL) => | |
println(s"s: $s, cS: $cS, cL: $cL") | |
(cS must beSome(beEqualTo(s))).and(cL must beSome(beEqualTo(s))) | |
} | |
def backingUsesFallback = | |
cache | |
.put[IO]("backingUsesFallback", "longTerm")(Vector(5, 6), None) | |
.productR( | |
Stream | |
.raiseError[IO](new RuntimeException()) | |
.through(Pipes.backingCache("backingUsesFallback")(cache, None, cache, None)) | |
.compile | |
.toVector | |
.product(cache.get[IO]("backingUsesFallback", "shortTerm")) | |
.product(cache.get[IO]("backingUsesFallback", "longTerm")) | |
.map { | |
case ((s, cS), cL) => | |
(cL must beSome(beEqualTo(s))).and(cS must beNone) | |
} | |
) | |
def backingEmits = { | |
val err = new RuntimeException() | |
Stream(1, 2) | |
.append(Stream.raiseError[IO](err)) | |
.append(Stream(3, 4)) | |
.through(Pipes.caching("backingEmits")(cache, None)) | |
.attempt | |
.compile | |
.toVector | |
.product(cache.get[IO]("backingEmits", "longTerm")) | |
.map { | |
case (s, c) => (s must beEqualTo(Vector(Right(1), Right(2), Left(err)))).and(c must beNone) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment