There is a great purely functional streaming/IO library in Scala called scalaz-stream. It is itself based upon a Haskell library called machines. They provide powerful abstractions to express compositional effectful computations. These libraries rely on certain type system features in both Scala and Haskell which are unavailable in F# - namely existential types and higher kinds. Higher kinds allow the monad representing the side-effects to be abstracted over. If however we specialize this to a specific monad, we can get around the lack of existentials as well.
Tomas Petricek created a type called asynchronous sequence which provides similar capabilities. The F# AsyncSeq
type is declared as follows:
type AsyncSeq<'a> = Async<AsyncSeqInner<'a>>
and AsyncSeqInner<'a> =
| Nil
| Cons of 'a * AsyncSeq<'a>
This is a recursive structure much like List
but composed with Async
.
Scalaz-stream has a similar but more general concept of a Process
. It allows you to plug different monads in place of Async
(or its counterpart in Scalaz called Task
). If however we specialize it to Async
, which seems to be the primary use case anyway, its approximate representation in F# would be:
type Process<'a> =
| Halt
| Emit of head:'a * tail:Process<'a>
| Await of req:Async<'I> * rcv:('I -> Process<'a>)
A Process
represents a state machine which can either be stopped, emiting values of type 'a
or awaiting values of type 'I
from the Async
monad and continuing the computation (in scalaz-stream the monad can be abstracted over). The reason this is approximate is because this isn't valid F# - the type 'I
isn't declared. The F# type system doesn't allow this type of declaration - namely in places where there is a type, but you don't care to declare it upfront so that it can vary. This is called an existential type. The Emit
case looks very similar to the Cons
case of AsyncSeq
except that the tail in Cons
is wrapped in Async
. Moreover, there is no Await
case in AsyncSeq
. Fortunately the Await
case can be "collapsed".
First, Await
can be transformed to:
type Process<'a> =
| Halt
| Emit of head:'a * tail:Process<'a>
| Await of Async<Process<'a>>
Intuitively, the transformation is acceptable because Async
can be mapped over:
val req : Async<'I>
val rcv : 'I -> Process<'a>
val map : ('a -> 'b) -> Async<'a> -> Async<'b>
let mapped : Async<Process<'a>> = req |> map rcv
With this transformation, it is evident how a Process
can represent AsyncSeq
- the tail in Emit
can point to an Await
corresponding to Cons
in `AsyncSeq:
type Process<'a> =
| Halt
| Emit of 'a * Process<'a>
| Await of Async<Process<'a>>
module Process =
// Proc<'a> -> AsyncSeq<'a>
let rec toAsyncSeq = function
| Halt -> AsyncSeq.empty
| Emit (a,tail) -> asyncSeq {
yield a
yield! toAsyncSeq tail
}
| Await r -> asyncSeq {
let! r = r
yield! toAsyncSeq r
}
// AsyncSeq<'a> -> Proc<'a>
let rec ofAsyncSeq (s:AsyncSeq<'a>) =
Await <| async {
let! s = s
match s with
| Nil -> return Halt
| Cons (a,tail) -> return Emit (a, ofAsyncSeq tail)
}
F# computation workflows make this easy. What sets them apart from monadic do notation alone is that within the asyncSeq
workflow it is possible to also bind to computations of type Async
as is done in the Await
clause in toAsyncSeq
.
-- removed algebra, needs correction, thanks to Vesa Karvonen for explaining --