In Pull[F, O, R]
, R
is the return type. Pull
represents a computation that emits some values on the Stream (of type O) and returns a new thing (R). In order to convert a Pull to a Stream, R must be Unit. This is because an FS2 Stream does not have the ability to terminate with a return value.
See here for the conversation
Stream[F, O]
is monadic over O which are the output values emittedPull[F, O, R]
is monadic over R which is used for stateful transformations
If you take a closer look at what we have been doing when writing our own version of myTake, we have been using that R value and emitting elements on the stream (O value)
Let's take a closer look at our myTake combinator
def myTake[F[_], I](nrOfElements: Long): Pipe[F, I, I] = { inputStream: Stream[F, I] =>
def go(stream: Stream[F, I], remaining: Long): Pull[F, I, Unit] =
if (remaining <= 0) Pull.done
else stream.pull.uncons1.flatMap {
case None => Pull.done
case Some((element, remStream)) => Pull.output1(element) >> go(remStream, remaining - 1)
}
go(inputStream, nrOfElements).stream
}
We use the monadic Pull[F, O, R]
API which is monadic over R
, this looks a bit deceiving here (with go
) because it appears that we aren't using that R
type because it's returning Unit
which is not the entire picture.
Let's take a closer look at uncons1
This is interesting, this is a Pull[F, Nothing, R=Option[(O, Stream[F, O])]]
, notice, everything is in the return type (R
). When we map
or flatMap
(which is what we have been doing), we get access to that R
value.
We use Pull.output1
to emit O
values on the Stream
AND we also need to transfer some state (in our case, the remaining stream and the number of elements left to pull).
We do that state transfer using the R
type. The Pull
datatype is monadic on that R
type so we use it to transfer state between Pull
s.
So the pattern here is to pull the stream, use unconsX
to get the remaining Stream
data (and any other state you need) in the R
type, do some stateful operations and decide whether to emit the data as O
values using Pull.outputX
and then use flatMap
(or >>
which is also flatMap
but does not care about the argument so flatMap(_ => …)
is just >>(...)
) to carry any updated state through to the next Pull
.
This process happens recursively. We call Pull.done
when we have no more work left to do which sets the R
value to Unit
indicating that there is no result type anymore and we have the ability to convert Pull[F, O, R=Unit]
into a FS2 Stream
of type Stream[F, O]
which will contain all the O
elements we have emitted during our Pull
transformations.
Note: the Pull.output1
combinator is used to emit elements of type O
on the FS2 Stream
.
Note: the Pull.done
combinator is to signal that that there is nothing more to pull from the upstream and inadvertently there is nothing more to emit on the FS2 Stream (since type O
= Nothing
) and
our Pull
has nothing to return (R
= Unit
) which also indicates that there is really no point of doing any further map
s or flatMap
s (so no more monadic transformations to be done on the Pull
)
Example usage: