Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Last active August 13, 2018 03:08
Show Gist options
  • Select an option

  • Save calvinlfer/01b0c29501b37ac5eb9db973af5f5d53 to your computer and use it in GitHub Desktop.

Select an option

Save calvinlfer/01b0c29501b37ac5eb9db973af5f5d53 to your computer and use it in GitHub Desktop.
A simple implementation of take for FS2 streams to familiarize myself with the Pull API
import fs2._
def myTake[F[_], A](nrOfElements: Int): Pipe[F, A, A] = inputStream => {
def go(s: Stream[F, A], remaining: Int): Pull[F, A, Unit] =
if (remaining == 0) Pull.done
else s.pull.uncons1.flatMap {
case Some((element: A, remStream: Stream[F, A])) => Pull.output1(element) >> go(remStream, remaining - 1)
case None => Pull.done
}
go(inputStream, nrOfElements).stream
}
// Usage
Stream.emits(Seq(1, 2, 3)).through(myTake(2)).toList
// res3: List[Int] = List(1, 2)
@calvinlfer
Copy link
Author

This will pull a single element at a time from the Stream. If the Stream is finite and we ask for more data than what is present in the stream, we will get up to the max number of elements that we have requested.

@calvinlfer
Copy link
Author

calvinlfer commented Aug 12, 2018

Here's a more advanced implementation that uses FS2's Segments

  def advancedMyTake[F[_], A](nrOfElements: Int): Pipe[F, A, A] =
    inputStream => {
      def go(s: Stream[F, A], rem: Int): Pull[F, A, Unit] =
        if (rem == 0) Pull.done
        else s.pull.uncons.flatMap {
          case None => Pull.done
          case Some((segment: Segment[A, Unit], remStream: Stream[F, A])) =>
            Pull.segment(segment.take(rem)).flatMap {
              //  The result of the returned segment is either a left
              //  containing the result of the original segment and the number of elements remaining to take when
              //  the end of the source segment was reached
              case Left((_, remaining)) => go(remStream, remaining.toInt)
              // or a right containing the remainder of the source segment after `n` elements are taken.
              case Right(_) => Pull.done
            }
        }

      go(inputStream, nrOfElements).stream
    }

Instead of using uncons1, we use uncons which gives us FS2 segments. We can call take on the segments but it gives us an Either where the Left indicates that we have fully consumed the segment and it tells us the remaining amount of demand left. Let's explain that statement, so if I ask for 10 elements and my segment contains 3 then it will give us back a Left telling us that we have 7 elements of demand left which is useful since we directly use this to consume the next segment. We get a Right telling us about the remainder of the segment after we took the elements from the segment. Again, let's explain that with an example. If I have a Segment containing [1, 2, 3, 4, 5] and I apply a take of 2 then I will get back a Right of Segment(3, 4, 5) which is the remainder of the segment after I have taken 2 elements.

@calvinlfer
Copy link
Author

I found out after looking at the FS2 internals talk that I have access to a higher level layer (Chunk) so here's an implementation using Chunk:

def highLevelTake[F[_], O](nrOfElements: Int): Pipe[F, O, O] = {
  def go(remaining: Int, remStream: Stream[F, O]): Pull[F, O, Unit] =
    if (remaining <= 0) Pull.done
    else remStream.pull.unconsChunk.flatMap {
      case None => Pull.done
      case Some((chunk, nextRemStream)) =>
        if (chunk.size < remaining) Pull.outputChunk(chunk) >> go(remaining - chunk.size, nextRemStream)
        else Pull.outputChunk(chunk.take(remaining))
    }

  inputStream => go(nrOfElements, inputStream).stream
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment