cats-effect Resource is extremely handy for managing the lifecycle of stateful resources, for example database or queue connections. It gives a main interface of:
trait Resource[F[_], A] {
/** - Acquire resource
* - Run f
* - guarantee that if acquire ran, release will run, even if `use` is cancelled or `f` fails
*/
def use[B](f: A => F[B]): F[B]
}It goes beyond even what try-finally gives, in that it guarantees concurrency safety and is aware of cancellation.
There are use cases where use is very awkwardly shaped; for example, the case where we want to use a resource repeatedly for the lifetime of the whole application, but auto-reallocate on certain errors, or auto-reallocate on a timer. Two cases there might be:
- A rabbit channel. We want one connection open for the whole application, but we don't want to stop/restart the whole app if there's a connection error, we only want to fix the channel.
- External API clients with a TTL that we want to acquire, reuse, and invalidate at a point in time.
To solve this, I wrote this small interface:
trait CachedResource[F[_], A] {
def run[B](f: A => F[B]): F[B]
def invalidate: F[Unit]
def invalidateIfNeeded(shouldInvalidate: A => Boolean): F[Unit]
}It's similar to resource, except that on run completing, instead of releasing immediately, release only when invalidate is called.
I added multiple implementations for different use cases, with differing guarantees.
Each is implemented using a similar strategy. They model an internal state machine with a cats-effect Ref.
Ref is a functional interface to atomically updated concurrent-safe mutable state. In these classes, I primarily use two methods of its api
trait Ref[F, A] {
def set(a: A): F[Unit]
def modify[B](f: A => (A, B)): F[B]
}modify uses a compare-and-set strategy internally to guarantee lock-free concurrent modification. When you chose an f that returns (A, F[B]), you can think of it as semantically modeling one step of a state machine; (next state, next action). ref.modify(a => (newA, fAction)).flatten will then update the state and perform the action.
I label this inside the classes as:
def transition[A](f: State => (State, F[A])): F[A] =
cache.modify(f).flattenFortunately, cats-effect Resource provides us an advanced api in addition to use:
trait Resource[F[_], A] {
// ... other methods ...
/** (resource, release-function). */
def allocated: F[(A, F[Unit])]
}This api is advanced/unsafe because it loses the guaranteed cleanup that Resource provides. It does allow us the flexibility we need to build much more advanced constructs on top of it, however.
For a very simple implementation using that "Ref as state machine" concept, see SyncCachedResource. It uses SyncIO, which is a wrapped IO that is not allowed to perform concurrency or asynchronous execution.
In this case, we say
// Either empty, or allocated
type State = Option[(R, SyncIO[Unit])]Here we can use the transition we defined, plus PartialFunction syntax sugar (omitting the argument of a function and using case instead) to get a succinct state machine expression.
// (Some details omitted)
def invalidate: SyncIO[Unit] = transition[Unit] {
case None =>
None -> F.unit
case Some((resource, release)) =>
None -> release
}
def run[A](f: R => SyncIO[A]): SyncIO[A] = transition[A] {
case None =>
// Like flatMap, but also give us a hook to run on Complete|Canceled|Failure
None -> resource.allocated.bracketCase {
case res @ (resource, release) =>
cache.set(Some(res)) >> f(resource)
} {
case ((resource, release), ExitCase.Canceled) => release
case _ => F.unit
}
case s @ Some((resource, release)) =>
// Keep the same state, just perform the action
s -> f(r)
}ConcurrentCachedObject is a concurrent implementation that wraps acquire: F[A] instead of Resource - something that doesn't need cleanup. This allows simpler logic, but we still have a state machine to work with.
// Given Resource[F, R]
type Gate = Deferred[F, Unit] // cats-effect Promise equivalent
sealed trait RState
case object Empty extends RState
case class Ready(r: R) extends RState
case class Pending(gate: Gate) extends RStateOne transition in particular to highlight:
// Impurity as a convenience to avoid "just in case" allocations - we can't *run* any `F[A]` inside of `transition`, only return them.
// This is the equivalent of running with scissors, but I promise to be careful
def newGate(): Gate = Deferred.unsafe[F, A]
def run[A](f: R => F[A]): F[A] = transition[A] {
case Empty =>
val gate = newGate()
Pending(gate) -> (runAcquire(gate) >> run(f))
case s @ Pending(gate) =>
s -> (gate.get >> run(f)) // wait for gate completion then retry
case s @ Ready(r) => /* ... */
}Pending here is used because the shape of modify is State => (State, data).
The guarantee we get from Ref is that the state update will happen atomically and data is returned. We return an action (F[A]) as our data, but the execution of it could be delayed - perhaps another thread gets scheduled before us.
To handle that case, we atomically update the state with our Gate (Deferred)
/** A pure Promise. State is either "complete with `A`" or "empty" */
trait Deferred[F[_], A] {
/** If empty, semantically block until complete, then return `A`
* If complete, return `A` immediately
*/
def get: F[A]
/** Complete this, and allow any blocked `get` to return with `a`
* May only be called once ever
*/
def complete(a: A): F[Unit]
}What we get from this combination is:
- When we are
Emptyand werun, atomically set the state toPending - Any future
runmust wait for Pending'sgateto return before they are allowed to progress runAcquireis allowed to take as long as it wants. It must handle failure, but that's relatively straightforward.- Once the
gateis complete, all pending operations will proceed. - All those
gate.getcalls are semantically blocking but not thread blocking. Cats-effect gives us a "green thread" or "M-to-N" thread model, where computations (calledFibers) are run inside JVM (OS) threads, and semantically blocking a Fiber is a asynchronous operation that is cheap and safe.
ConcurrentCachedResource combines all of the above strategies to get a CachedResource which
- Guarantees no resources will be leaked - if it's allocated, it's released (as long as someone calls
invalidate) - Guarantees that at most one resource is allocated at a time
- Allows any number of
runcalls concurrently - Allows concurrent calls of
runandinvalidate-runwill never observe a closed resource - When
invalidateis called, semantically blocksinvalidateuntil existingruncalls are complete. - When
invalidateis called, semantically blocks futureruncalls until invalidate is complete
Special thanks to @SystemFW for his review, advice, and slide deck on Ref+Deferred, which was crucial to my ability to write this.
Here is the adapted code to support key-value cached object.
I am not satisfied with the signatures, any recommendation ?