class Stream[+F[_], +O] extends AnyVal- A stream producing output of type
O - May evaluate F effects.
class Pull[+F[_], +O, +R] extends AnyValp: Pull[F,O,R]reads values from one or more streams- Returns a result of type
R, - Produces a
Stream[F,O]when calling p.stream.
In order to understand these types we will compare them to the types described the Red Book Chapter 15 : (https://github.com/fpinscala/fpinscala/blob/master/exercises/src/main/scala/fpinscala/streamingio/StreamingIO.scala)
case class Await[I,O](
recv: Option[I] => Process[I,O])
extends Process[I,O]This is the first simple version (no effect involved, i.e F[_])
Process[I,O] can be thaught has a Stream[I] => Stream[O]
From the book:
Await(recv) requests a value from the input stream. The driver should pass
the next available value to the recv function, or None if the input has no more
elements.
It's basically Pulling on the stream and requesting an element I and then it has to produce a Stream[I] => Stream[O] using this I
Later on, Await is generalized an you get :
trait Process[F[_],O] {
import Process._
}
object Process {
case class Await[F[_],A,O](
req: F[A],
recv: Either[Throwable,A] => Process[F,O]) extends Process[F,O]
)
}Definition : Process[F,O] represents a stream of O values (O for output) produced
by (possibly) making external requests using the protocol F via Await.
Let's get rid of the Either and get back to a simpler
case class Await[F[_],A,O](
req: F[A],
recv: Option[A] => Process[F,O]) extends Process[F,O]
)
case class Emit[F[_],O](
head: O,
tail: Process[F,O]) extends Process[F,O]
From the FS2 guide[https://functional-streams-for-scala.github.io/fs2/guide.html]
A Stream[F,O] (formerly Process) represents a :
- Discrete stream of
Ovalues - Which may request evaluation of F effects.
Let's complete the sentence using the definition of Await:
Which may request evaluation of F effects via Pull
Let's rename the types to the one we like
case class Pull[F[_],A,O](
req: F[A],
recv: Option[A] => Stream[F,O]) extends Stream[F,O]
)
case class Emit[F[_],O](
head: O,
tail: Stream[F,O]) extends Stream[F,O]to request element in fs2 the uncons1 function is used
def uncons1: Pull[F, Nothing, Option[(O, Stream[F, O])]]Waits for a single element to be available in the source stream. The element along with a new stream are provided as the resource of the returned pull. The new stream can be used for subsequent operations, like awaiting again. A None is returned as the resource of the pull upon reaching the end of the stream.
To emit an element :
def output1[F[x] >: Pure[x], O](o: O): Pull[F, O, Unit]Ouptuts a single value.
Pull can both request via uncons1 and output elements via output1 !
In terms of the type from the Red book it means :
Pull merges Process.Emit and Process.Await in one type.
How do you know in which state it is ?
uncons1: Pull[F, O = Nothing, R = Option[(O, Stream[F, O])]]
Ois Nothing because it is in requesting mode , no output allowedRis theProcess.Emitheadandtailzipped together, so when youflatMapon to thePullyou can access the last emitted valueFis the effect it evaluated when you flatmapped onR
output1[F, O](o: O): Pull[F, O, R = Unit]
Oisowe are emmiting the input to the streamRisUnitbecause we cannot request(Await) in theEmitstate
Process[F,O] |
Stream[F,O] |
|---|---|
Await(req: F[A], recv: Option[A] => Stream[F,O]) |
Pull[F, O = Nothing, R = Option[(O, Stream[F, O])]] |
Emit(head: O, tail: Stream[F,O]) |
Pull[F, O, R = Unit] |