class Stream[+F[_], +O] extends AnyVal
- A stream producing output of type
O
- May evaluate F effects.
class Pull[+F[_], +O, +R] extends AnyVal
p: Pull[F,O,R]
reads values from one or more streams- Returns a result of type
R
, - Produces a
Stream[F,O]
when calling p.stream.
In order to understand these types we will compare them to the types described the Red Book Chapter 15 : (https://github.com/fpinscala/fpinscala/blob/master/exercises/src/main/scala/fpinscala/streamingio/StreamingIO.scala)
case class Await[I,O](
recv: Option[I] => Process[I,O])
extends Process[I,O]
This is the first simple version (no effect involved, i.e F[_]
)
Process[I,O]
can be thaught has a Stream[I] => Stream[O]
From the book:
Await(recv)
requests a value from the input stream. The driver should pass
the next available value to the recv
function, or None
if the input has no more
elements.
It's basically Pulling on the stream and requesting an element I
and then it has to produce a Stream[I] => Stream[O]
using this I
Later on, Await
is generalized an you get :
trait Process[F[_],O] {
import Process._
}
object Process {
case class Await[F[_],A,O](
req: F[A],
recv: Either[Throwable,A] => Process[F,O]) extends Process[F,O]
)
}
Definition : Process[F,O]
represents a stream of O
values (O
for output) produced
by (possibly) making external requests using the protocol F
via Await.
Let's get rid of the Either
and get back to a simpler
case class Await[F[_],A,O](
req: F[A],
recv: Option[A] => Process[F,O]) extends Process[F,O]
)
case class Emit[F[_],O](
head: O,
tail: Process[F,O]) extends Process[F,O]
From the FS2 guide[https://functional-streams-for-scala.github.io/fs2/guide.html]
A Stream[F,O]
(formerly Process
) represents a :
- Discrete stream of
O
values - Which may request evaluation of F effects.
Let's complete the sentence using the definition of Await
:
Which may request evaluation of F effects via Pull
Let's rename the types to the one we like
case class Pull[F[_],A,O](
req: F[A],
recv: Option[A] => Stream[F,O]) extends Stream[F,O]
)
case class Emit[F[_],O](
head: O,
tail: Stream[F,O]) extends Stream[F,O]
to request element in fs2 the uncons1
function is used
def uncons1: Pull[F, Nothing, Option[(O, Stream[F, O])]]
Waits for a single element to be available in the source stream. The element along with a new stream are provided as the resource of the returned pull. The new stream can be used for subsequent operations, like awaiting again. A None is returned as the resource of the pull upon reaching the end of the stream.
To emit an element :
def output1[F[x] >: Pure[x], O](o: O): Pull[F, O, Unit]
Ouptuts a single value.
Pull
can both request via uncons1
and output elements via output1
!
In terms of the type from the Red book it means :
Pull
merges Process.Emit
and Process.Await
in one type.
How do you know in which state it is ?
uncons1: Pull[F, O = Nothing, R = Option[(O, Stream[F, O])]]
O
is Nothing because it is in requesting mode , no output allowedR
is theProcess.Emit
head
andtail
zipped together, so when youflatMap
on to thePull
you can access the last emitted valueF
is the effect it evaluated when you flatmapped onR
output1[F, O](o: O): Pull[F, O, R = Unit]
O
iso
we are emmiting the input to the streamR
isUnit
because we cannot request(Await
) in theEmit
state
Process[F,O] |
Stream[F,O] |
---|---|
Await(req: F[A], recv: Option[A] => Stream[F,O]) |
Pull[F, O = Nothing, R = Option[(O, Stream[F, O])]] |
Emit(head: O, tail: Stream[F,O]) |
Pull[F, O, R = Unit] |