-
-
Save calvinlfer/01b0c29501b37ac5eb9db973af5f5d53 to your computer and use it in GitHub Desktop.
| import fs2._ | |
| def myTake[F[_], A](nrOfElements: Int): Pipe[F, A, A] = inputStream => { | |
| def go(s: Stream[F, A], remaining: Int): Pull[F, A, Unit] = | |
| if (remaining == 0) Pull.done | |
| else s.pull.uncons1.flatMap { | |
| case Some((element: A, remStream: Stream[F, A])) => Pull.output1(element) >> go(remStream, remaining - 1) | |
| case None => Pull.done | |
| } | |
| go(inputStream, nrOfElements).stream | |
| } | |
| // Usage | |
| Stream.emits(Seq(1, 2, 3)).through(myTake(2)).toList | |
| // res3: List[Int] = List(1, 2) |
Here's a more advanced implementation that uses FS2's Segments
def advancedMyTake[F[_], A](nrOfElements: Int): Pipe[F, A, A] =
inputStream => {
def go(s: Stream[F, A], rem: Int): Pull[F, A, Unit] =
if (rem == 0) Pull.done
else s.pull.uncons.flatMap {
case None => Pull.done
case Some((segment: Segment[A, Unit], remStream: Stream[F, A])) =>
Pull.segment(segment.take(rem)).flatMap {
// The result of the returned segment is either a left
// containing the result of the original segment and the number of elements remaining to take when
// the end of the source segment was reached
case Left((_, remaining)) => go(remStream, remaining.toInt)
// or a right containing the remainder of the source segment after `n` elements are taken.
case Right(_) => Pull.done
}
}
go(inputStream, nrOfElements).stream
}Instead of using uncons1, we use uncons which gives us FS2 segments. We can call take on the segments but it gives us an Either where the Left indicates that we have fully consumed the segment and it tells us the remaining amount of demand left. Let's explain that statement, so if I ask for 10 elements and my segment contains 3 then it will give us back a Left telling us that we have 7 elements of demand left which is useful since we directly use this to consume the next segment. We get a Right telling us about the remainder of the segment after we took the elements from the segment. Again, let's explain that with an example. If I have a Segment containing [1, 2, 3, 4, 5] and I apply a take of 2 then I will get back a Right of Segment(3, 4, 5) which is the remainder of the segment after I have taken 2 elements.
I found out after looking at the FS2 internals talk that I have access to a higher level layer (Chunk) so here's an implementation using Chunk:
def highLevelTake[F[_], O](nrOfElements: Int): Pipe[F, O, O] = {
def go(remaining: Int, remStream: Stream[F, O]): Pull[F, O, Unit] =
if (remaining <= 0) Pull.done
else remStream.pull.unconsChunk.flatMap {
case None => Pull.done
case Some((chunk, nextRemStream)) =>
if (chunk.size < remaining) Pull.outputChunk(chunk) >> go(remaining - chunk.size, nextRemStream)
else Pull.outputChunk(chunk.take(remaining))
}
inputStream => go(nrOfElements, inputStream).stream
}
This will pull a single element at a time from the Stream. If the Stream is finite and we ask for more data than what is present in the stream, we will get up to the max number of elements that we have requested.