Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Last active March 26, 2024 17:19
Show Gist options
  • Save Daenyth/28243952f1fcfac6e8ef838040e8638e to your computer and use it in GitHub Desktop.
Save Daenyth/28243952f1fcfac6e8ef838040e8638e to your computer and use it in GitHub Desktop.
CachedResource for cats-effect

Concurrent resource caching for cats

Motivation

cats-effect Resource is extremely handy for managing the lifecycle of stateful resources, for example database or queue connections. It gives a main interface of:

trait Resource[F[_], A] {
  /** - Acquire resource
    * - Run f
    * - guarantee that if acquire ran, release will run, even if `use` is cancelled or `f` fails
    */
  def use[B](f: A => F[B]): F[B]
}

It goes beyond even what try-finally gives, in that it guarantees concurrency safety and is aware of cancellation.

There are use cases where use is very awkwardly shaped; for example, the case where we want to use a resource repeatedly for the lifetime of the whole application, but auto-reallocate on certain errors, or auto-reallocate on a timer. Two cases there might be:

  • A rabbit channel. We want one connection open for the whole application, but we don't want to stop/restart the whole app if there's a connection error, we only want to fix the channel.
  • External API clients with a TTL that we want to acquire, reuse, and invalidate at a point in time.

CachedResource

To solve this, I wrote this small interface:

trait CachedResource[F[_], A] {
  def run[B](f: A => F[B]): F[B]
  def invalidate: F[Unit]
  def invalidateIfNeeded(shouldInvalidate: A => Boolean): F[Unit]
}

It's similar to resource, except that on run completing, instead of releasing immediately, release only when invalidate is called.

Implementation concept

Ref State machine

I added multiple implementations for different use cases, with differing guarantees. Each is implemented using a similar strategy. They model an internal state machine with a cats-effect Ref.

Ref is a functional interface to atomically updated concurrent-safe mutable state. In these classes, I primarily use two methods of its api

trait Ref[F, A] {
  def set(a: A): F[Unit]
  def modify[B](f: A => (A, B)): F[B]
}

modify uses a compare-and-set strategy internally to guarantee lock-free concurrent modification. When you chose an f that returns (A, F[B]), you can think of it as semantically modeling one step of a state machine; (next state, next action). ref.modify(a => (newA, fAction)).flatten will then update the state and perform the action.

I label this inside the classes as:

def transition[A](f: State => (State, F[A])): F[A] =
  cache.modify(f).flatten

Advanced Resource API

Fortunately, cats-effect Resource provides us an advanced api in addition to use:

trait Resource[F[_], A] {
 // ... other methods ...

 /** (resource, release-function). */
 def allocated: F[(A, F[Unit])]
}

This api is advanced/unsafe because it loses the guaranteed cleanup that Resource provides. It does allow us the flexibility we need to build much more advanced constructs on top of it, however.

Non-concurrent CachedResource

For a very simple implementation using that "Ref as state machine" concept, see SyncCachedResource. It uses SyncIO, which is a wrapped IO that is not allowed to perform concurrency or asynchronous execution.

In this case, we say

// Either empty, or allocated
type State = Option[(R, SyncIO[Unit])]

Here we can use the transition we defined, plus PartialFunction syntax sugar (omitting the argument of a function and using case instead) to get a succinct state machine expression.

// (Some details omitted)

def invalidate: SyncIO[Unit] = transition[Unit] {
  case None =>
    None -> F.unit
  case Some((resource, release)) =>
    None -> release
}

def run[A](f: R => SyncIO[A]): SyncIO[A] = transition[A] {
  case None =>
    // Like flatMap, but also give us a hook to run on Complete|Canceled|Failure
    None -> resource.allocated.bracketCase {
        case res @ (resource, release) =>
          cache.set(Some(res)) >> f(resource)
      } {
        case ((resource, release), ExitCase.Canceled) => release
        case _                                        => F.unit
      }
  case s @ Some((resource, release)) =>
    // Keep the same state, just perform the action
    s -> f(r)
}

Concurrent implementation

ConcurrentCachedObject is a concurrent implementation that wraps acquire: F[A] instead of Resource - something that doesn't need cleanup. This allows simpler logic, but we still have a state machine to work with.

// Given Resource[F, R]
type Gate = Deferred[F, Unit] // cats-effect Promise equivalent

sealed trait RState
case object Empty extends RState
case class Ready(r: R) extends RState
case class Pending(gate: Gate) extends RState

One transition in particular to highlight:

// Impurity as a convenience to avoid "just in case" allocations - we can't *run* any `F[A]` inside of `transition`, only return them.
// This is the equivalent of running with scissors, but I promise to be careful
def newGate(): Gate = Deferred.unsafe[F, A]

def run[A](f: R => F[A]): F[A] = transition[A] {
  case Empty =>
    val gate = newGate()
    Pending(gate) -> (runAcquire(gate) >> run(f))

  case s @ Pending(gate) =>
    s -> (gate.get >> run(f)) // wait for gate completion then retry

  case s @ Ready(r) => /* ... */
}

Pending here is used because the shape of modify is State => (State, data). The guarantee we get from Ref is that the state update will happen atomically and data is returned. We return an action (F[A]) as our data, but the execution of it could be delayed - perhaps another thread gets scheduled before us.

To handle that case, we atomically update the state with our Gate (Deferred)

/** A pure Promise. State is either "complete with `A`" or "empty" */
trait Deferred[F[_], A] {
  /** If empty, semantically block until complete, then return `A`
    * If complete, return `A` immediately
    */
  def get: F[A]
  
  /** Complete this, and allow any blocked `get` to return with `a`
    * May only be called once ever
    */
  def complete(a: A): F[Unit]
}

What we get from this combination is:

  • When we are Empty and we run, atomically set the state to Pending
  • Any future run must wait for Pending's gate to return before they are allowed to progress
  • runAcquire is allowed to take as long as it wants. It must handle failure, but that's relatively straightforward.
  • Once the gate is complete, all pending operations will proceed.
  • All those gate.get calls are semantically blocking but not thread blocking. Cats-effect gives us a "green thread" or "M-to-N" thread model, where computations (called Fibers) are run inside JVM (OS) threads, and semantically blocking a Fiber is a asynchronous operation that is cheap and safe.

The big one

ConcurrentCachedResource combines all of the above strategies to get a CachedResource which

  • Guarantees no resources will be leaked - if it's allocated, it's released (as long as someone calls invalidate)
  • Guarantees that at most one resource is allocated at a time
  • Allows any number of run calls concurrently
  • Allows concurrent calls of run and invalidate - run will never observe a closed resource
  • When invalidate is called, semantically blocks invalidate until existing run calls are complete.
  • When invalidate is called, semantically blocks future run calls until invalidate is complete

Thanks

Special thanks to @SystemFW for his review, advice, and slide deck on Ref+Deferred, which was crucial to my ability to write this.

package teikametrics.effect
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.implicits._
import cats.effect.{Bracket, Concurrent, ExitCase, Resource, Sync, SyncIO}
import cats.implicits._
import scala.util.control.NonFatal
trait CachedResource[F[_], R] extends CachedResource.Runner[F, R] {
/** Invalidates any current instance of `R`, guaranteeing that ???*/
def invalidate: F[Unit]
/** Run `f` with an instance of `R`, possibly allocating a new one, or possibly reusing an existing one.
* Guarantees that `R` will not be invalidated until `f` returns */
def run[A](f: R => F[A]): F[A]
/** Invalidate if `shouldRefresh` returns true, otherwise do nothing */
def invalidateIfNeeded(shouldInvalidate: R => Boolean): F[Unit]
}
object CachedResource {
/** Run `f` with `get`, and if `f` fails and `shouldInvalidate` returns `true`
*
* @param shouldInvalidate If true, invalidate. If false or not defined, do not invalidate.
* Default: always invalidate (assuming NonFatal)
*/
def runAndInvalidateOnError[F[_], R, A](cr: CachedResource[F, R])(
f: R => F[A],
shouldInvalidate: PartialFunction[Throwable, Boolean] = {
case NonFatal(_) => true
}
)(implicit F: Bracket[F, Throwable]): F[A] =
cr.run(f).guaranteeCase {
case ExitCase.Completed | ExitCase.Canceled =>
F.unit
case ExitCase.Error(e) =>
val willInvalidate = shouldInvalidate.lift(e).getOrElse(false)
F.whenA(willInvalidate)(cr.invalidate)
}
/** Runner that checks if refresh is needed before each `run` call, and additionally can invalidate on errors */
def runner[F[_], R, A](cr: CachedResource[F, R])(
shouldRefresh: R => Boolean,
shouldInvalidate: PartialFunction[Throwable, Boolean] = {
case NonFatal(_) => true
}
)(
implicit F: Bracket[F, Throwable]
): Runner[F, R] = new Runner[F, R] {
override def run[B](f: R => F[B]): F[B] =
for {
_ <- cr.invalidateIfNeeded(shouldRefresh)
b <- cr.run(f).guaranteeCase {
case ExitCase.Completed | ExitCase.Canceled =>
F.unit
case ExitCase.Error(e) =>
val willInvalidate = shouldInvalidate.lift(e).getOrElse(false)
F.whenA(willInvalidate)(cr.invalidate)
}
} yield b
}
// NB Runner is exactly the `Codensity` typeclass from haskell
trait Runner[F[_], A] {
def run[B](f: A => F[B]): F[B]
}
}
object SyncCachedResource {
def apply[R](resource: Resource[SyncIO, R]): SyncIO[SyncCachedResource[R]] =
SyncIO(new SyncCachedResource(resource))
}
/** Non-concurrent-safe but simple CachedResource implementation */
class SyncCachedResource[R] private (resource: Resource[SyncIO, R])
extends CachedResource[SyncIO, R] {
// Either empty, or allocated
private type State = Option[(R, SyncIO[Unit])]
override def invalidate: SyncIO[Unit] = transition[Unit] {
case None => None -> unit
case Some((_, release)) =>
None -> release
}
override def run[A](f: R => SyncIO[A]): SyncIO[A] = transition[A] {
case None =>
empty -> resource.allocated.bracketCase {
case res @ (r, _) =>
cache.set(Some(res)) >> f(r)
} {
case ((_, release), ExitCase.Canceled) => release
case _ => unit
}
case s @ Some((r, _)) => s -> f(r)
}
override def invalidateIfNeeded(
shouldInvalidate: R => Boolean): SyncIO[Unit] = transition[Unit] {
case s @ Some((r, release)) =>
if (shouldInvalidate(r)) empty -> release
else s -> unit
case None => empty -> unit
}
private def transition[A](f: State => (State, SyncIO[A])): SyncIO[A] =
cache.modify(f).flatten
private val empty = Option.empty[(R, SyncIO[Unit])]
private val unit = SyncIO.unit
private val cache: Ref[SyncIO, Option[(R, SyncIO[Unit])]] =
Ref.unsafe[SyncIO, Option[(R, SyncIO[Unit])]](None)
}
object ConcurrentCachedResource {
def apply[F[_]: Concurrent, R](
resource: Resource[F, R]): F[ConcurrentCachedResource[F, R]] =
Sync[F].delay(new ConcurrentCachedResource(resource))
}
// private ctor because of Ref.unsafe in class body, `new` needs `F.delay` around it
class ConcurrentCachedResource[F[_], R] private (resource: Resource[F, R])(
implicit F: Concurrent[F]
) extends CachedResource[F, R] {
/** Resource state */
private sealed trait RState
private type Gate = Deferred[F, Unit]
private case object Empty extends RState
private case class Ready(r: R,
release: F[Unit],
running: Int,
pendingInvalidation: Option[Gate])
extends RState
private case class Allocating(gate: Gate) extends RState
private case class Invalidating(gate: Gate) extends RState
override def invalidate: F[Unit] = transition[Unit] {
case s @ Ready(_, release, running, pendingInvalidation) =>
running match {
case 0 =>
val gate = pendingInvalidation.getOrElse(newGate())
Invalidating(gate) -> runRelease(release, gate)
case _ =>
// Jobs in flight - they need to clean up
pendingInvalidation match {
case Some(_) =>
s -> F.unit // could getOrElse for shorter code but prefer to avoid the allocation
case None =>
s.copy(pendingInvalidation = Some(newGate())) -> F.unit
}
}
case s @ Invalidating(gate) =>
// We only enter this state when jobs are in-flight - just wait on them to finish us
s -> gate.get
case Empty =>
Empty -> F.unit
case s @ Allocating(gate) =>
// Preserve invariant that `run >> invalidate >> run` acquires resource twice
s -> (gate.get *> invalidate)
}
override def run[A](f: R => F[A]): F[A] = transition[A] {
case s @ Ready(r, _, running, None) =>
s.copy(running = running + 1) -> f(r).guarantee(runCompleted)
case s @ Ready(_, _, _, Some(gate)) =>
s -> (gate.get >> run(f))
case Empty =>
val gate = newGate()
Allocating(gate) -> (runAllocate(gate) >> run(f))
case s @ Allocating(gate) =>
s -> (gate.get >> run(f))
case s @ Invalidating(gate) =>
s -> (gate.get >> run(f))
}
private def runCompleted: F[Unit] = transition[Unit] {
case s @ Ready(_, _, running, pendingInvalidation) =>
val stillRunning = running - 1
val action = F.whenA(stillRunning == 0) {
// Our job to trigger the cleanup - `uncancelable` because if that final invalidation gets cancelled, then
// nothing will ever `complete` the `pendingInvalidation` gate, and the whole thing deadlocks
pendingInvalidation.traverse_(_ => invalidate.uncancelable)
}
s.copy(running = stillRunning) -> action
case other =>
other -> F.raiseError(new IllegalStateException(
s"Tried to complete run when state was $other. This means there is an implementation error in ${this.getClass.getCanonicalName}"))
}
// Must only be called at the right time, otherwise we could close a resource currently in-use in a `run` call
private def runRelease(release: F[Unit], gate: Gate): F[Unit] =
(release >> cache.set(Empty) >> gate.complete(())).uncancelable
override def invalidateIfNeeded(shouldInvalidate: R => Boolean): F[Unit] =
transition[Unit] {
case s @ Ready(r, _, _, None) =>
s -> F.whenA(shouldInvalidate(r))(invalidate)
case other => other -> F.unit
}
private def runAllocate(gate: Gate): F[Unit] =
// bracketCase is needed here; allocated.flatMap isn't safe with cancellation
resource.allocated
.bracketCase {
case (r, release) =>
(cache.set(Ready(r, release, 0, None)) *> gate.complete(())).uncancelable
} {
case ((_, release), ExitCase.Canceled) =>
// Cancelled between `allocated` and `bracketCase(use)`
runRelease(release, gate)
case _ => F.unit
}
.onError {
case NonFatal(e) =>
// On allocation error, reset the cache and notify anyone that started waiting while we were in `Allocating`
runRelease(F.unit, gate)
}
// Using `unsafe` just so that I can have RState be an inner type, to avoid useless type parameters on RState
private val cache = Ref.unsafe[F, RState](Empty)
// empty parens to disambiguate the overload
private def transition[A](f: RState => (RState, F[A])): F[A] =
cache.modify(f).flatten
// `unsafe` because the effect being run is just a `new AtomicRef...`, and most cases where we might need it,
// we don't need it, so don't force `flatMap` to get it ready "just in case"
private def newGate(): Gate = Deferred.unsafe[F, Unit]
}
object ConcurrentCachedObject {
def apply[F[_]: Concurrent, R](acquire: F[R]): F[CachedResource[F, R]] =
Sync[F].delay(new ConcurrentCachedObject[F, R](acquire))
}
// private ctor because of Ref.unsafe in class body, `new` needs `F.delay` around it
class ConcurrentCachedObject[F[_], R] private (acquire: F[R])(
implicit F: Concurrent[F]
) extends CachedResource[F, R] {
/** Resource state */
private sealed trait RState
private type Gate = Deferred[F, Unit]
private case object Empty extends RState
private case class Ready(r: R) extends RState
private case class Pending(gate: Gate) extends RState
override def invalidate: F[Unit] =
transition[Unit](_ => Empty -> F.unit)
override def run[A](f: R => F[A]): F[A] = transition[A] {
case s @ Ready(r) =>
s -> f(r)
case Empty =>
val gate = newGate()
Pending(gate) -> (runAcquire(gate) >> run(f))
case s @ Pending(gate) =>
s -> (gate.get >> run(f))
}
override def invalidateIfNeeded(shouldInvalidate: R => Boolean): F[Unit] =
transition[Unit] {
case s @ Ready(r) =>
s -> F.whenA(shouldInvalidate(r))(invalidate)
case other =>
other -> F.unit
}
private def runAcquire(gate: Gate): F[Unit] =
acquire
.flatMap { r =>
(cache.set(Ready(r)) *> gate.complete(())).uncancelable
}
.onError {
case NonFatal(_) =>
// On allocation error, reset the cache and notify anyone that started waiting while we were in `Allocating`
setEmpty(gate)
}
private def setEmpty(gate: Gate): F[Unit] =
(cache.set(Empty) >> gate.complete(())).uncancelable
// Using `unsafe` just so that I can have RState be an inner type, to avoid useless type parameters on RState
private val cache = Ref.unsafe[F, RState](Empty)
// empty parens to disambiguate the overload
private def transition[A](f: RState => (RState, F[A])): F[A] =
cache.modify(f).flatten
// `unsafe` because the effect being run is just a `new AtomicRef...`, and most cases where we might need it,
// we don't need it, so don't force `flatMap` to get it ready "just in case"
private def newGate(): Gate = Deferred.unsafe[F, Unit]
}
package teikametrics.effect
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.laws.util.TestContext
import cats.effect.{ContextShift, IO, Resource, Sync, SyncIO, Timer}
import cats.implicits._
import cats.{Applicative, ApplicativeError, FlatMap}
import fs2.Stream
import org.scalactic.source.Position
import org.scalatest.words.ResultOfStringPassedToVerb
import org.scalatest.{Assertion, AsyncFlatSpec, Inspectors, Matchers}
import teikametrics.RefLogger
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random
import scala.util.control.NoStackTrace
class ConcurrentCachedResourceSpec
extends AsyncFlatSpec with ConcurrentCachedResourceBehavior {
"ConcurrentCachedResource" should behave like cachedResource(create)
"ConcurrentCachedResource" should behave like concurrentCachedResource(create)
"ConcurrentCachedResource (slow acquire)" should behave like concurrentCachedResource(
slowResource(time, 0.milli))
"ConcurrentCachedResource (slow release)" should behave like concurrentCachedResource(
slowResource(0.milli, time))
"ConcurrentCachedResource (slow acquire & release)" should behave like concurrentCachedResource(
slowResource(time, time))
"failing to acquire" should "not deadlock runs" inIO {
// TODO copy this test for behave like cachedResource
for {
(_, res, allocOk, _) <- unreliableResource
cr <- ConcurrentCachedResource(res)
_ <- allocOk.set(false)
run1Result <- cr.run(_ => IO.unit).attempt
_ <- allocOk.set(true)
_ <- cr.run(_.assertLive[IO]).timeout(1.milli)
} yield run1Result shouldBe Left(FailAlloc)
}
"failing to acquire" should "not deadlock invalidation" inIO {
// TODO copy this test for behave like cachedResource
for {
(_, res, allocOk, _) <- unreliableResource
cr <- ConcurrentCachedResource(res)
_ <- allocOk.set(false)
run1Result <- cr.run(_ => IO.unit).attempt
_ <- allocOk.set(true)
_ <- cr.invalidate.timeout(1.milli)
} yield run1Result shouldBe Left(FailAlloc)
}
"failing to acquire (slowly)" should "not deadlock runs" inIO {
for {
(_, res, allocOk, sleeper) <- unreliableResource
cr <- ConcurrentCachedResource(res)
_ <- allocOk.set(false)
_ <- sleeper.set(Some(3.nano))
run1 <- cr.run(_ => IO.unit).attempt.start
_ <- timer.sleep(time) // less than alloc timeout, but long enough that we are sure `.start` has begun
_ <- allocOk.set(true)
_ <- sleeper.set(None)
_ <- cr.run(_.assertLive[IO]).timeout(1.milli)
run1Result <- run1.join
} yield run1Result shouldBe Left(FailAlloc)
}
"failing to acquire (slowly)" should "not deadlock invalidate" inIO {
for {
(_, res, allocOk, sleeper) <- unreliableResource
cr <- ConcurrentCachedResource(res)
_ <- allocOk.set(false)
_ <- sleeper.set(Some(3.nano))
run1 <- cr.run(_ => IO.unit).attempt.start
_ <- timer.sleep(time) // less than alloc timeout, but long enough that we are sure `.start` has begun
_ <- cr.invalidate.timeout(1.milli)
run1Result <- run1.join
} yield run1Result shouldBe Left(FailAlloc)
}
def slowResource(
acquireSleep: FiniteDuration,
releaseSleep: FiniteDuration
): IO[(Pool, CachedResource[IO, Obj])] =
for {
pool <- Ref[IO].of(Map.empty[Int, Obj])
ids <- Ref[IO].of(1)
res = Resource.make(
timer.sleep(acquireSleep).uncancelable *> Resources.alloc(ids, pool)) {
obj =>
(timer.sleep(releaseSleep).uncancelable *> Resources
.basicRelease(pool)
.apply(obj)).uncancelable
}
cr <- ConcurrentCachedResource(res)
} yield pool -> cr
case object FailAlloc
extends Exception("Failed resource allocate") with NoStackTrace
def unreliableResource: IO[(Pool,
Resource[IO, Obj],
Ref[IO, Boolean],
Ref[IO, Option[FiniteDuration]])] =
for {
pool <- Ref[IO].of(Map.empty[Int, Obj])
ids <- Ref[IO].of(1)
passer <- Ref[IO].of(true)
sleeper <- Ref[IO].of(Option.empty[FiniteDuration])
} yield {
val alloc = for {
pass <- passer.get
sleep <- sleeper.get
_ <- sleep.traverse_(timer.sleep)
obj <- if (pass) Resources.alloc(ids, pool)
else IO.raiseError(FailAlloc)
} yield obj
val resource = Resource.make(alloc) { obj =>
Resources
.basicRelease(pool)
.apply(obj)
.uncancelable
}
(pool, resource, passer, sleeper)
}
def create: IO[(Pool, CachedResource[IO, Obj])] =
for {
(pool, res) <- Resources.basic
cr <- ConcurrentCachedResource(res)
} yield (pool, cr)
}
class SyncCachedResourceSpec
extends AsyncFlatSpec with CachedResourceBehavior[SyncIO] {
// works with AsyncTestSuite serialExecutionContext
"SyncCachedResource" should behave like cachedResource(create)
def create =
for {
(pool, res) <- Resources.basic
cr <- SyncCachedResource(res)
} yield (pool, cr)
override protected def toFuture(fa: SyncIO[Assertion])(
implicit pos: Position): Future[Assertion] =
fa.toIO.unsafeToFuture()
}
class ConcurrentCachedObjectSpec
extends AsyncFlatSpec with ConcurrentCachedResourceBehavior {
behavior of "ConcurrentCachedObject"
it should behave like cachedResource(create, withCleanup = false)
it should behave like concurrentCachedResource(create, withCleanup = false)
def create: IO[(Pool, CachedResource[IO, Obj])] =
for {
pool <- Ref[IO].of(Map.empty[Int, Obj])
ids <- Ref[IO].of(1)
res = Resources.alloc(ids, pool)
cr <- ConcurrentCachedObject(res)
} yield (pool, cr)
}
class Obj(val id: Int) {
private var _alive: Boolean = true
def alive: Boolean = synchronized(_alive)
def unsafeRelease(): Unit = synchronized(_alive = false)
def assertLive[F[_]](implicit F: ApplicativeError[F, Throwable]): F[Unit] =
Applicative[F].unlessA(alive)(
F.raiseError(new Exception(s"Obj ${this.id} is dead")))
override def equals(obj: Any): Boolean = obj match {
case that: Obj => this eq that
case _ => false
}
override def hashCode(): Int = id.hashCode()
override def toString: String = s"Obj($id alive=$alive)"
}
trait BaseCachedResourceBehavior[F[_]] extends Matchers with Inspectors {
this: AsyncFlatSpec =>
protected val time = 1.nano
protected type Pool = Ref[F, Map[Int, Obj]]
protected object Resources {
def alloc(ids: Ref[F, Int], pool: Pool)(implicit F: FlatMap[F]): F[Obj] =
ids.modify(cur => (cur + 1, cur)).flatMap { id =>
pool.modify { m =>
val obj = new Obj(id)
m.updated(obj.id, obj) -> obj
}
}
def basicRelease(pool: Pool)(implicit F: Sync[F]): Obj => F[Unit] =
obj => F.delay(obj.unsafeRelease())
def basic(implicit F: Sync[F]): F[(Pool, Resource[F, Obj])] =
for {
pool <- Ref[F].of(Map.empty[Int, Obj])
ids <- Ref[F].of(1)
} yield pool -> Resource.make(alloc(ids, pool))(basicRelease(pool))
}
protected def toFuture(fa: F[Assertion])(
implicit pos: Position): Future[Assertion]
protected implicit class ItVerbStringOps(itVerbString: ItVerbString) {
def inIO(testFun: => F[Assertion])(implicit pos: Position): Unit =
itVerbString.in(toFuture(testFun))
}
protected implicit class ResultOfStringPassedToVerbOps(
obj: ResultOfStringPassedToVerb) {
def inIO(testFun: => F[Assertion])(implicit pos: Position): Unit =
obj.in(toFuture(testFun))
}
}
trait CachedResourceBehavior[F[_]] extends BaseCachedResourceBehavior[F] {
this: AsyncFlatSpec =>
/** should behave like cachedResource(create) */
protected def cachedResource(
create: F[(Pool, CachedResource[F, Obj])],
withCleanup: Boolean = true // Whether or not this resource is expected to clean up Obj on invalidate
)(implicit F: Sync[F]): Unit = {
it should "run with no previous state" inIO {
for {
(_, cr) <- create
_ <- cr.run(_.assertLive[F])
} yield succeed
}
it should "invalidate with no previous state" inIO {
for {
(_, cr) <- create
_ <- cr.invalidate
} yield succeed
}
it should "run and then invalidate" inIO {
for {
(pool, cr) <- create
id <- cr.run(r => r.assertLive[F].as(r.id))
_ <- cr.invalidate
obj <- pool.get.map(_.get(id))
} yield {
if (withCleanup)
obj.get.alive shouldEqual false
else succeed
}
}
it should "reuse for multiple runs" inIO {
for {
(_, cr) <- create
id1 <- cr.run(r => r.assertLive[F].as(r.id))
id2 <- cr.run(r => r.assertLive[F].as(r.id))
} yield id1 shouldEqual id2
}
it should "get a new resource after invalidating" inIO {
for {
(_, cr) <- create
id1 <- cr.run(r => r.assertLive[F].as(r.id))
_ <- cr.invalidate
id2 <- cr.run(r => r.assertLive[F].as(r.id))
} yield id1 should not equal id2
}
it should "allow run to fail and still work after" inIO {
for {
(_, cr) <- create
oops = new Exception("oops")
result <- cr.run(_ => F.raiseError[Int](oops)).attempt
alive <- cr.run(_.alive.pure[F])
} yield {
result shouldEqual Left(oops)
alive shouldBe true
}
}
}
}
trait ConcurrentCachedResourceBehavior extends CachedResourceBehavior[IO] {
this: AsyncFlatSpec =>
implicit val ctx: TestContext = TestContext()
// Explicitly pass Async[IO] because sometimes scalac wants to compile
// to 'ctx.contextShift(IO.ioConcurrentEffect(this.CS))`, which is recursive, and explodes.
// And yes, "sometimes", because implicit resolution is slightly nondeterministic
implicit val CS: ContextShift[IO] = ctx.contextShift[IO](IO.ioEffect)
implicit val timer: Timer[IO] = ctx.timer[IO]
def concurrentCachedResource(
create: IO[(Pool, CachedResource[IO, Obj])],
withCleanup: Boolean = true
): Unit = {
// Alias here so I can move code between the traits easier
type F[A] = IO[A]
it should "get a new resource after invalidating (concurrently)" inIO {
for {
(_, cr) <- create
id1 <- cr.run(r => timer.sleep(time) *> r.assertLive[F].as(r.id))
_ <- cr.invalidate
id2 <- cr.run(r => r.assertLive[F].as(r.id))
} yield id1 should not equal id2
}
it should "reuse resource when starting a run while one run is in progress" inIO {
for {
(_, cr) <- create
gate <- Deferred[F, Unit]
// use gate so run can't complete until after another concurrent run starts
run1 <- cr.run(r => gate.get.as(r.id)).start
id2 <- cr.run(r => gate.complete(()).as(r.id))
id1 <- run1.join
} yield id1 shouldEqual id2
}
it should "defer releasing for invalidate until in flight run completes" inIO {
for {
(_, cr) <- create
run <- cr.run(r => timer.sleep(time) *> r.assertLive[F]).start
_ <- cr.invalidate
_ <- run.join
} yield succeed
}
it should "race run and invalidate without failing or leaking" inIO {
for {
(pool, cr) <- create
_ <- cr.run(_ => IO.unit) // warmup allocate
parLimit = 8 // arbitrary
tasks = 100 // arbitrary
results <- Stream(
cr.run(r => timer.sleep(r.id.millis) *> r.assertLive[F]).attempt,
cr.invalidate.attempt
).covary[F]
.repeat
.take(tasks.toLong)
.mapAsyncUnordered(parLimit)(io => io)
.compile
.toList
_ <- cr.invalidate // Make sure the last task is to invalidate
objects <- pool.get
} yield {
all(results) shouldBe 'right
if (withCleanup) {
forAll(objects.values) { obj =>
obj.alive shouldBe false
}
}
val numAllocated = objects.keySet.max
val maxAllocated = tasks / 2 // div by 2 because half are run, half invalidate
numAllocated should be <= maxAllocated
}
}
it should "not leak or deadlock under aggressive cancellation and concurrency" inIO {
sealed abstract class Task {
def id: Int
def run(cr: CachedResource[F, Obj]): F[Unit]
}
case class Sleep(id: Int, dur: Int) extends Task {
def run(cr: CachedResource[F, Obj]): F[Unit] =
cr.run(r => timer.sleep(dur.nanos) *> r.assertLive[F])
}
case object Ex extends Exception("ok") with NoStackTrace
case class Err(id: Int) extends Task {
val err: F[Unit] = IO.raiseError(Ex)
def run(cr: CachedResource[F, Obj]): F[Unit] =
cr.run(_ => err).recoverWith {
case Ex => IO.unit
}
}
case class Invalidate(id: Int) extends Task {
def run(cr: CachedResource[F, Obj]): F[Unit] =
cr.invalidate
}
val taskCount = 1000
for {
log <- RefLogger.withTime[F]
(pool, cr) <- create
rand = IO(Random.nextInt(5)) // 0 to 4 inclusive
bool = IO(Random.nextBoolean())
ids = Stream.iterate(0)(_ + 1)
tasks: Stream[F, (String, Either[Throwable, Unit])] = Stream[
F,
Int => F[Task]
](
i => rand.map(dur => Sleep(i, dur)),
i => (Err(i): Task).pure[F],
i => (Invalidate(i): Task).pure[F],
).repeat
.take(taskCount.toLong)
.zipWith(ids) { case (mkTask, i) => mkTask(i) }
.evalMap(identity)
.mapAsyncUnordered(taskCount) { t: Task =>
for {
_ <- log.info(s"Start $t")
f <- t.run(cr).start
} yield (t, f)
} // concurrent .start in non-deterministic order
.mapAsyncUnordered(taskCount) {
case (t, f) =>
for {
_ <- log.info(s"End $t")
// Timeout will only fail if we deadlocked
e <- Sync[F].ifM(bool)(f.cancel.attempt,
f.join.timeout(1.hour).attempt)
_ <- log.info(s" Ended $t: $e")
} yield t.toString -> e
} // Cancel/join in non-deterministic order
results <- tasks.compile.toVector
_ <- cr.invalidate
objects <- pool.get
logData <- log.history // unused but available for debugger exploration
} yield {
val logLines = logData.toVector // unused, but in scope so it's visible in the debugger
if (withCleanup) { forAll(objects.values)(_.alive shouldBe false) }
results.foreach {
case (taskId, result) =>
withClue(taskId) {
result shouldBe Right(())
}
}
succeed
}
}
}
final override protected def toFuture(fa: IO[Assertion])(
implicit pos: Position): Future[Assertion] = {
val year = 365.day
val test = fa
.timeoutTo(
year,
IO.raiseError(
new Exception(
"Test case did not complete within 1 year. Deadlock is likely"))
)
.unsafeToFuture() // Begin eager test execution async
// Resolve `IO` concurrency inside `test` by advancing the clock
ctx.tick(year)
ctx.tick(1.minute) // Definitely past our `timeoutTo`
val tasksAfterTick = ctx.state.tasks
if (tasksAfterTick.isEmpty) {
test // Now that `ctx` has no remaining `IO` to run, return the (completed) `Future[Assertion]`
} else {
// timeoutTo wasn't enough, maybe we deadlocked `uncancelable` IO?.
// `Future` has no ability to cancel, so hopefully it gets GC'd
throw new IllegalStateException(
s"""Test probably deadlocked.
| tasksAfterTick=$tasksAfterTick
| pos=$pos""".stripMargin
)
}
}
}
@monadplus
Copy link

Here is the adapted code to support key-value cached object.
I am not satisfied with the signatures, any recommendation ?

// Credit to Gavin Bisesi and his wonderful implementation at:
// https://gist.github.com/Daenyth/28243952f1fcfac6e8ef838040e8638e

// It's actually not a resource, just an effectful constructor
trait MultiCachedResource[F[_], K, R] {
  def run[A](k: K)(f: R => F[A]): F[Option[A]]
}

object ConcurrentMultiCachedObject {
  def apply[F[_]: Concurrent, K, R](acquire: K => F[Option[R]]): F[MultiCachedResource[F, K, R]] =
    Sync[F].delay(new ConcurrentMultiCachedObject[F, K, R](acquire))
}

class ConcurrentMultiCachedObject[F[_]: Concurrent, K, R](acquire: K => F[Option[R]]) extends MultiCachedResource[F, K, R] {

  private type MRState = Map[K, Option[RState]]
  private val cache = Ref.unsafe[F, MRState](Map.empty[K, Option[RState]])

  private def transition[A](f: MRState => (MRState, F[A])): F[A] =
    cache.modify(f).flatten

  private type Gate = Deferred[F, Unit]
  private def newGate(): Gate = Deferred.unsafe[F, Unit]

  private sealed trait RState
  private case class Ready(r: R) extends RState
  private case class Pending(gate: Gate) extends RState

  override def run[A](k: K)(f: R => F[A]): F[Option[A]] = transition[Option[A]](map => map.get(k) match {
    case Some(v) => v match {
      case Some(Ready(r)) =>
        map -> f(r).map(Some(_))

      case Some(Pending(gate: Gate)) =>
        map -> (gate.get >> run(k)(f))

      case None =>
        map -> none.pure[F]
    }

    case None =>
      val gate = newGate()
      map + (k -> Pending(gate).some) -> (runAcquire(k)(gate) >> run(k)(f))
  })

  private def runAcquire(k: K)(gate: Gate): F[Unit] =
    acquire(k).flatMap { maybeR =>
      (cache.update(_ + (k -> maybeR.map(Ready(_)))) *> gate.complete(())).uncancelable
    }.onError {
      case NonFatal(_) =>
        (cache.update(_ - k) >> gate.complete(())).uncancelable
    }
}

@Daenyth
Copy link
Author

Daenyth commented Nov 9, 2020

All code in my post is available free to use under the MIT open source license

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment