cats-effect Resource
is extremely handy for managing the lifecycle of stateful resources, for example database or queue connections. It gives a main interface of:
trait Resource[F[_], A] {
/** - Acquire resource
* - Run f
* - guarantee that if acquire ran, release will run, even if `use` is cancelled or `f` fails
*/
def use[B](f: A => F[B]): F[B]
}
It goes beyond even what try
-finally
gives, in that it guarantees concurrency safety and is aware of cancellation.
There are use cases where use
is very awkwardly shaped; for example, the case where we want to use a resource repeatedly for the lifetime of the whole application, but auto-reallocate on certain errors, or auto-reallocate on a timer. Two cases there might be:
- A rabbit channel. We want one connection open for the whole application, but we don't want to stop/restart the whole app if there's a connection error, we only want to fix the channel.
- External API clients with a TTL that we want to acquire, reuse, and invalidate at a point in time.
To solve this, I wrote this small interface:
trait CachedResource[F[_], A] {
def run[B](f: A => F[B]): F[B]
def invalidate: F[Unit]
def invalidateIfNeeded(shouldInvalidate: A => Boolean): F[Unit]
}
It's similar to resource, except that on run
completing, instead of releasing immediately, release only when invalidate
is called.
I added multiple implementations for different use cases, with differing guarantees.
Each is implemented using a similar strategy. They model an internal state machine with a cats-effect Ref
.
Ref
is a functional interface to atomically updated concurrent-safe mutable state. In these classes, I primarily use two methods of its api
trait Ref[F, A] {
def set(a: A): F[Unit]
def modify[B](f: A => (A, B)): F[B]
}
modify
uses a compare-and-set strategy internally to guarantee lock-free concurrent modification. When you chose an f
that returns (A, F[B])
, you can think of it as semantically modeling one step of a state machine; (next state, next action)
. ref.modify(a => (newA, fAction)).flatten
will then update the state and perform the action.
I label this inside the classes as:
def transition[A](f: State => (State, F[A])): F[A] =
cache.modify(f).flatten
Fortunately, cats-effect Resource
provides us an advanced api in addition to use
:
trait Resource[F[_], A] {
// ... other methods ...
/** (resource, release-function). */
def allocated: F[(A, F[Unit])]
}
This api is advanced/unsafe because it loses the guaranteed cleanup that Resource
provides. It does allow us the flexibility we need to build much more advanced constructs on top of it, however.
For a very simple implementation using that "Ref as state machine" concept, see SyncCachedResource
. It uses SyncIO
, which is a wrapped IO
that is not allowed to perform concurrency or asynchronous execution.
In this case, we say
// Either empty, or allocated
type State = Option[(R, SyncIO[Unit])]
Here we can use the transition
we defined, plus PartialFunction
syntax sugar (omitting the argument of a function and using case
instead) to get a succinct state machine expression.
// (Some details omitted)
def invalidate: SyncIO[Unit] = transition[Unit] {
case None =>
None -> F.unit
case Some((resource, release)) =>
None -> release
}
def run[A](f: R => SyncIO[A]): SyncIO[A] = transition[A] {
case None =>
// Like flatMap, but also give us a hook to run on Complete|Canceled|Failure
None -> resource.allocated.bracketCase {
case res @ (resource, release) =>
cache.set(Some(res)) >> f(resource)
} {
case ((resource, release), ExitCase.Canceled) => release
case _ => F.unit
}
case s @ Some((resource, release)) =>
// Keep the same state, just perform the action
s -> f(r)
}
ConcurrentCachedObject
is a concurrent implementation that wraps acquire: F[A]
instead of Resource
- something that doesn't need cleanup. This allows simpler logic, but we still have a state machine to work with.
// Given Resource[F, R]
type Gate = Deferred[F, Unit] // cats-effect Promise equivalent
sealed trait RState
case object Empty extends RState
case class Ready(r: R) extends RState
case class Pending(gate: Gate) extends RState
One transition in particular to highlight:
// Impurity as a convenience to avoid "just in case" allocations - we can't *run* any `F[A]` inside of `transition`, only return them.
// This is the equivalent of running with scissors, but I promise to be careful
def newGate(): Gate = Deferred.unsafe[F, A]
def run[A](f: R => F[A]): F[A] = transition[A] {
case Empty =>
val gate = newGate()
Pending(gate) -> (runAcquire(gate) >> run(f))
case s @ Pending(gate) =>
s -> (gate.get >> run(f)) // wait for gate completion then retry
case s @ Ready(r) => /* ... */
}
Pending
here is used because the shape of modify
is State => (State, data)
.
The guarantee we get from Ref
is that the state update will happen atomically and data
is returned. We return an action (F[A]
) as our data, but the execution of it could be delayed - perhaps another thread gets scheduled before us.
To handle that case, we atomically update the state with our Gate
(Deferred)
/** A pure Promise. State is either "complete with `A`" or "empty" */
trait Deferred[F[_], A] {
/** If empty, semantically block until complete, then return `A`
* If complete, return `A` immediately
*/
def get: F[A]
/** Complete this, and allow any blocked `get` to return with `a`
* May only be called once ever
*/
def complete(a: A): F[Unit]
}
What we get from this combination is:
- When we are
Empty
and werun
, atomically set the state toPending
- Any future
run
must wait for Pending'sgate
to return before they are allowed to progress runAcquire
is allowed to take as long as it wants. It must handle failure, but that's relatively straightforward.- Once the
gate
is complete, all pending operations will proceed. - All those
gate.get
calls are semantically blocking but not thread blocking. Cats-effect gives us a "green thread" or "M-to-N" thread model, where computations (calledFiber
s) are run inside JVM (OS) threads, and semantically blocking a Fiber is a asynchronous operation that is cheap and safe.
ConcurrentCachedResource
combines all of the above strategies to get a CachedResource
which
- Guarantees no resources will be leaked - if it's allocated, it's released (as long as someone calls
invalidate
) - Guarantees that at most one resource is allocated at a time
- Allows any number of
run
calls concurrently - Allows concurrent calls of
run
andinvalidate
-run
will never observe a closed resource - When
invalidate
is called, semantically blocksinvalidate
until existingrun
calls are complete. - When
invalidate
is called, semantically blocks futurerun
calls until invalidate is complete
Special thanks to @SystemFW for his review, advice, and slide deck on Ref+Deferred, which was crucial to my ability to write this.
Here is the adapted code to support key-value cached object.
I am not satisfied with the signatures, any recommendation ?