Skip to content

Instantly share code, notes, and snippets.

@schrepfler
Forked from mpilquist/switchMap.scala
Created August 6, 2024 08:05
Show Gist options
  • Save schrepfler/a09223ae334d9c98138610d599cfa832 to your computer and use it in GitHub Desktop.
Save schrepfler/a09223ae334d9c98138610d599cfa832 to your computer and use it in GitHub Desktop.
FS2 version of switchMap
def switchMap[F[_]: Async, A, B](f: A => Stream[F, B]): Pipe[F, A, B] = {
def go(
outer: ScopedFuture[F, Pull[F, Nothing, (NonEmptyChunk[A], Handle[F, A])]],
inner: ScopedFuture[F, Pull[F, Nothing, (NonEmptyChunk[B], Handle[F, B])]]
): Pull[F, B, Nothing] = {
(outer race inner).pull.flatMap {
case Left(outer) =>
outer.optional.flatMap {
case None =>
inner.pull.flatMap(identity).flatMap { case (hd, tl) => Pull.output(hd) >> tl.echo }
case Some((c, outer)) =>
// TOOD: We're never going to look at the current `inner` again so we need to eagerly
// release any resources acquired by it here. How?
f(c(c.size - 1)).open.flatMap { inner =>
outer.awaitAsync.flatMap { outer => inner.awaitAsync.flatMap { inner => go(outer, inner) } }
}
}
case Right(inner) =>
inner.optional.flatMap {
case None =>
outer.pull.flatMap(identity).flatMap { case (c, tl) =>
val last = c(c.size - 1)
val newInnerStream = f(last)
newInnerStream.open.flatMap { inner =>
tl.awaitAsync.flatMap { outer =>
inner.awaitAsync.flatMap { inner =>
go(outer, inner)
}
}
}
}
case Some((c, inner)) =>
Pull.output(c) >> inner.awaitAsync.flatMap { inner => go(outer, inner) }
}
}
}
in => in.open.flatMap { outer =>
outer.receive { (c, outer) =>
f(c(c.size - 1)).open.flatMap { inner =>
outer.awaitAsync.flatMap { outer =>
inner.awaitAsync.flatMap { inner =>
go(outer, inner)
}
}
}
}
}.close
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment