The cats-effect
project defines a purely functional effect type (IO[A]
), and associated typeclasses defining its behaviour.
The ones we care about for this example are:
trait Sync[F[_]] extends MonadError[F, Throwable] {
def delay[A](a: => A): F[A]
...
}
trait Async[F[_] extends Sync[F] {
def async[A](cb: (Either[Throwable,A] => Unit) => Unit): F[A]
...
}
which describe the ability of effects type like IO
to embed and suspend synchronous and asynchronous side-effects
The design of cats-effect
is oriented towards non-blocking functionality, and includes its own green-threading and "semantic blocking"
primitives that don't actually block the underlying JVM thread. This allows users to run an arbitrary number of IO
s on top of a CPU bound
pool like global
, and even on a single thread (like in scala.js). However, when interacting with legacy Java apis, one often
deals with code that blocks JVM threads. The most common pattern to deal with this situation, after wrapping the blocking code in F[_]: Sync
,
is to run it on a custom thread pool that's specific to blocking code, and then run the rest of the computation back on the "normal" pool.
This pattern is informally called "shift-and-shift-back", and relies on a function called shift
, which can be defined for any F[_]: Async
def shift[F[_]](ec: ExecutionContext)(implicit F: Async[F]): F[Unit] =
F.async { cb => ... }
What shift
does is move whatever F[A]
is flatMapped afterwards to ec
, until a subsequent call to shift
moves it somewhere else.
Example, say you have some code that makes a JDBC call (notoriously blocking).
def dbCall[F[_]: Sync]: F[MyType] = F.delay {... query that blocks... }
which is then followed my normal async or cpu-bound code, like an http4s http call, or some data transformation
def post[F[_]: Async](a: MyType): F[HttpResponse] = ....
and let's say that we have an EC
for blocking, called blockingEC
and we run everything else on global
.
An example of "shift-and-shift-back" would be:
def prog[F[_]: Async]: F[HttpResponse] =
for {
_ <- Async.shift[F](blockingEC)
myType <- dbCall[F]
_ <- Async.shift[F](global)
res <- post[F](myType)
} yield res
The above pattern works well enough, but it's a bit tedious and error-prone for the end user to have to remember to do it by hand each
time, so let's write a combinator for it (for those familiar with cats-effect, I'm omitting Timer
and bracket
for simplicity):
def blocking[F[_]: Async, A](p: F[A], blockingEC: ExecutionContext)(implicit ec: ExecutionContext): F[A] =
for {
_ <- Async.shift[F](blockingEC)
res <- p
_ <- Async.shift[F]
} yield res
The interesting part here is that we need an Async
constraint on F
in order to call shift
.
However there's a problem: p
is of type F[A]
where F
is Async
, so there's no guarantee that p
itself doesn't contain shift
ing calls
already, which might make blocking
useless (since p
might end up not running on blockingEc
as desired, given a call to shift
inside p
that moves it somewhere else).
Note that if we change the signature of blocking
to (note the Sync
constraint):
def blocking[F[_]: Sync, A](p: F[A], blockingEC: ExecutionContext)(implicit ec: ExecutionContext): F[A]
Apart from the fact that we can't call Async.shift
anymore so we can't implement it, we could still pass IO
(which is Async
and can contain calls to shift
) and the F[_]: Sync
constraint would be respected, so that's not enough to ensure only Sync
datatypes are accepted.
This doesn't work either:
def blocking[F[_]: Async, G[_]: Sync, A](p: G[A], blockingEC: ExecutionContext)(implicit ec: ExecutionContext): F[A]
because we would need to flatMap
F
and G
in the same for comprehension, and there's no way to unify those. It also suffers from the same problem, we can pass any G1: Async
to G
and the Sync
constraint will be respected, so we aren't preventing shift
calls in G
What we need is a RankN type: we want p
to be passed as a polymorphic value, so that it's the callee (blocking
) that decides
which F
is p
instantiated to, and not the caller (which is in control with normal parametric polymorphism).
The type we need in Haskell can be written like so:
blocking :: Async f => (forall g. Sync g => g a) => BlockingEC => EC => f a
the caller to blocking
can't pass a potentially async value like IO[Int]
as the g a
to blocking
, because blocking
requires the value
to be fully polymorphic in g
, but it can pass a value defined over any G[_]: Sync
.
However, inside the body of blocking
g a
can be instantiated to whatever g
has a Sync
instance, and since Async
extends Sync
, f
is a
valid type. So basically we get what we want: blocking
can call shift
, and is assured that g a
is a synchronous value that doesn't
do any shifting itself.
Apologies for the lack of Scala in this last paragraph, but Scala doesn't support RankN types or polymorphic values, and I think this is
a compelling example (there are ways to simulate this specific scenario I think).
There are many other use cases though, ranging from Free.foldMap
to the ST monad
, to safe resource region that are enforced at compile
time, to many more.