Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Last active March 4, 2016 21:48
Show Gist options
  • Save eulerfx/78213ad3df64689ea94d to your computer and use it in GitHub Desktop.
Save eulerfx/78213ad3df64689ea94d to your computer and use it in GitHub Desktop.
Informal proof that F# AsyncSeq is equal in expressive power to scalaz-stream Process when specialized to Task

There is a great purely functional streaming/IO library in Scala called scalaz-stream. It is itself based upon a Haskell library called machines. They provide powerful abstractions to express compositional effectful computations. These libraries rely on certain type system features in both Scala and Haskell which are unavailable in F# - namely existential types and higher kinds. Higher kinds allow the monad representing the side-effects to be abstracted over. If however we specialize this to a specific monad, we can get around the lack of existentials as well.

Tomas Petricek created a type called asynchronous sequence which provides similar capabilities. The F# AsyncSeq type is declared as follows:

type AsyncSeq<'a> = Async<AsyncSeqInner<'a>>

and AsyncSeqInner<'a> = 
    | Nil
    | Cons of 'a * AsyncSeq<'a>

This is a recursive structure much like List but composed with Async.

Scalaz-stream has a similar but more general concept of a Process. It allows you to plug different monads in place of Async (or its counterpart in Scalaz called Task). If however we specialize it to Async, which seems to be the primary use case anyway, its approximate representation in F# would be:

type Process<'a> =
    | Halt
    | Emit of head:'a * tail:Process<'a>
    | Await of req:Async<'I> * rcv:('I -> Process<'a>)

A Process represents a state machine which can either be stopped, emiting values of type 'a or awaiting values of type 'I from the Async monad and continuing the computation (in scalaz-stream the monad can be abstracted over). The reason this is approximate is because this isn't valid F# - the type 'I isn't declared. The F# type system doesn't allow this type of declaration - namely in places where there is a type, but you don't care to declare it upfront so that it can vary. This is called an existential type. The Emit case looks very similar to the Cons case of AsyncSeq except that the tail in Cons is wrapped in Async. Moreover, there is no Await case in AsyncSeq. Fortunately the Await case can be "collapsed".

First, Await can be transformed to:

type Process<'a> =
    | Halt
    | Emit of head:'a * tail:Process<'a>
    | Await of Async<Process<'a>>

Intuitively, the transformation is acceptable because Async can be mapped over:

val req : Async<'I>

val rcv : 'I -> Process<'a>

val map : ('a -> 'b) -> Async<'a> -> Async<'b>

let mapped : Async<Process<'a>> = req |> map rcv

With this transformation, it is evident how a Process can represent AsyncSeq - the tail in Emit can point to an Await corresponding to Cons in `AsyncSeq:

type Process<'a> =
  | Halt
  | Emit of 'a * Process<'a>
  | Await of Async<Process<'a>>


module Process =

  // Proc<'a> -> AsyncSeq<'a>
  let rec toAsyncSeq = function
    | Halt -> AsyncSeq.empty
    | Emit (a,tail) -> asyncSeq { 
        yield a
        yield! toAsyncSeq tail
      }
    | Await r -> asyncSeq { 
        let! r = r
        yield! toAsyncSeq r
      }

  // AsyncSeq<'a> -> Proc<'a>
  let rec ofAsyncSeq (s:AsyncSeq<'a>) =
    Await <| async {
      let! s = s
      match s with
      | Nil -> return Halt
      | Cons (a,tail) -> return Emit (a, ofAsyncSeq tail)        
    }      

F# computation workflows make this easy. What sets them apart from monadic do notation alone is that within the asyncSeq workflow it is possible to also bind to computations of type Async as is done in the Await clause in toAsyncSeq.

-- removed algebra, needs correction, thanks to Vesa Karvonen for explaining --

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment