Skip to content

Instantly share code, notes, and snippets.

@SystemFw
Last active June 1, 2019 03:15
Show Gist options
  • Save SystemFw/256205f51bf0135e4c6fd95dee4590fc to your computer and use it in GitHub Desktop.
Save SystemFw/256205f51bf0135e4c6fd95dee4590fc to your computer and use it in GitHub Desktop.
Cats-effect, blocking, RankN-types.

cats-effect

The cats-effect project defines a purely functional effect type (IO[A]), and associated typeclasses defining its behaviour. The ones we care about for this example are:

trait Sync[F[_]] extends MonadError[F, Throwable] {
   def delay[A](a: => A): F[A]
   ...
}

trait Async[F[_] extends Sync[F] {
  def async[A](cb: (Either[Throwable,A] => Unit) => Unit): F[A]
  ...
}

which describe the ability of effects type like IO to embed and suspend synchronous and asynchronous side-effects

Blocking, shift, the shift-and-shift-back pattern

The design of cats-effect is oriented towards non-blocking functionality, and includes its own green-threading and "semantic blocking" primitives that don't actually block the underlying JVM thread. This allows users to run an arbitrary number of IOs on top of a CPU bound pool like global, and even on a single thread (like in scala.js). However, when interacting with legacy Java apis, one often deals with code that blocks JVM threads. The most common pattern to deal with this situation, after wrapping the blocking code in F[_]: Sync, is to run it on a custom thread pool that's specific to blocking code, and then run the rest of the computation back on the "normal" pool. This pattern is informally called "shift-and-shift-back", and relies on a function called shift, which can be defined for any F[_]: Async

def shift[F[_]](ec: ExecutionContext)(implicit F: Async[F]): F[Unit] =
    F.async { cb => ... }

What shift does is move whatever F[A] is flatMapped afterwards to ec, until a subsequent call to shift moves it somewhere else.

Example, say you have some code that makes a JDBC call (notoriously blocking).

def dbCall[F[_]: Sync]: F[MyType] = F.delay {... query that blocks... }

which is then followed my normal async or cpu-bound code, like an http4s http call, or some data transformation

def post[F[_]: Async](a: MyType): F[HttpResponse] = ....

and let's say that we have an EC for blocking, called blockingEC and we run everything else on global. An example of "shift-and-shift-back" would be:

 def prog[F[_]: Async]: F[HttpResponse] = 
   for {
     _ <- Async.shift[F](blockingEC)
     myType <- dbCall[F]
     _ <- Async.shift[F](global)
   res <- post[F](myType)
  } yield res

A blocking combinator

The above pattern works well enough, but it's a bit tedious and error-prone for the end user to have to remember to do it by hand each time, so let's write a combinator for it (for those familiar with cats-effect, I'm omitting Timer and bracket for simplicity):

def blocking[F[_]: Async, A](p: F[A], blockingEC: ExecutionContext)(implicit ec: ExecutionContext): F[A] = 
  for {
   _ <- Async.shift[F](blockingEC)
   res <- p
   _ <- Async.shift[F]
 } yield res

The interesting part here is that we need an Async constraint on F in order to call shift.

However there's a problem: p is of type F[A] where F is Async, so there's no guarantee that p itself doesn't contain shifting calls already, which might make blocking useless (since p might end up not running on blockingEc as desired, given a call to shift inside p that moves it somewhere else).

Note that if we change the signature of blocking to (note the Sync constraint):

def blocking[F[_]: Sync, A](p: F[A], blockingEC: ExecutionContext)(implicit ec: ExecutionContext): F[A]

Apart from the fact that we can't call Async.shift anymore so we can't implement it, we could still pass IO (which is Async and can contain calls to shift) and the F[_]: Sync constraint would be respected, so that's not enough to ensure only Sync datatypes are accepted.

This doesn't work either:

def blocking[F[_]: Async, G[_]: Sync, A](p: G[A], blockingEC: ExecutionContext)(implicit ec: ExecutionContext): F[A]

because we would need to flatMap F and G in the same for comprehension, and there's no way to unify those. It also suffers from the same problem, we can pass any G1: Async to G and the Sync constraint will be respected, so we aren't preventing shift calls in G

RankN types

What we need is a RankN type: we want p to be passed as a polymorphic value, so that it's the callee (blocking) that decides which F is p instantiated to, and not the caller (which is in control with normal parametric polymorphism).

The type we need in Haskell can be written like so:

blocking :: Async f => (forall g. Sync g => g a) => BlockingEC => EC => f a

the caller to blocking can't pass a potentially async value like IO[Int] as the g a to blocking, because blocking requires the value to be fully polymorphic in g, but it can pass a value defined over any G[_]: Sync. However, inside the body of blocking g a can be instantiated to whatever g has a Sync instance, and since Async extends Sync, f is a valid type. So basically we get what we want: blocking can call shift, and is assured that g a is a synchronous value that doesn't do any shifting itself.

Apologies for the lack of Scala in this last paragraph, but Scala doesn't support RankN types or polymorphic values, and I think this is a compelling example (there are ways to simulate this specific scenario I think). There are many other use cases though, ranging from Free.foldMap to the ST monad, to safe resource region that are enforced at compile time, to many more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment