Skip to content

Instantly share code, notes, and snippets.

@frosforever
Last active August 23, 2019 15:38
Show Gist options
  • Save frosforever/821127007752883dda2390b2859536c1 to your computer and use it in GitHub Desktop.
Save frosforever/821127007752883dda2390b2859536c1 to your computer and use it in GitHub Desktop.
Scope breaking fs2.Pipe
import fs2.{ Pipe, Pull } // Version 1.0.5
import cats.effect.{ ExitCode, IO, IOApp }
object BreakingPipe extends IOApp {
def onlyOnNonEmpty[F[_], A, B](innerPipe: fs2.Pipe[F, A, B]): fs2.Pipe[F, A, B] = { s =>
s.pull.peek.flatMap {
case None => Pull.pure(None)
case Some((_, ss)) => ss.through(innerPipe).pull.echo
}.stream
}
def pipeThatBreaksScope[A]: Pipe[IO, A, Option[A]] =
onlyOnNonEmpty { s => //Without the empty check it prints in the right order
// Without the `compile.last` it also prints in the right order
fs2.Stream.eval(s.compile.last)
}
override def run(args: List[String]): IO[ExitCode] = {
(
fs2.Stream.constant(1).take(10)
// I would expect this to print first
.onFinalize(IO(println("This should be first"))).scope ++
// And only after print this
fs2.Stream.eval_(IO(println(s"This should be next"))) ++ fs2.Stream.constant(2).take(10)
)
.through(pipeThatBreaksScope)
.compile
.drain
.map(_ => ExitCode.Success)
}
}
/*
I would expect:
This should be first
This should be next
But instead I get:
This should be next
This should be first
*/
@frosforever
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment