Skip to content

Instantly share code, notes, and snippets.

@mpilquist
Last active June 2, 2021 21:25
Show Gist options
  • Save mpilquist/d91cde13e1249a34e06729368212f4f6 to your computer and use it in GitHub Desktop.
Save mpilquist/d91cde13e1249a34e06729368212f4f6 to your computer and use it in GitHub Desktop.
FS2 example of merging two streams and outputting the last received B value with each A value
@ def lastB[F[_]: Async, A, B]: Pipe2[F, A, B, (A, B)] = {
def go(
lastB: B,
l: ScopedFuture[F, Pull[F, Nothing, (Option[A], Handle[F,A])]],
r: ScopedFuture[F, Pull[F, Nothing, (Option[B], Handle[F,B])]]
): Pull[F,(A, B),Nothing] =
(l race r).pull flatMap {
case Left(l) => l.optional flatMap {
case None => Pull.done
case Some((None, l)) => Pull.done
case Some((Some(hd), l)) => Pull.output1((hd, lastB)) >> l.await1Async.flatMap(go(lastB, _, r))
}
case Right(r) => r.optional flatMap {
case (None | Some((None, _))) => l.pull.flatMap(identity).flatMap {
case (None, tl) => Pull.done
case (Some(hd), tl) => Pull.output1((hd, lastB)) >> tl.map((_, lastB)).echo
}
case Some((Some(hd), r)) => r.await1Async.flatMap(go(hd, l, _))
}
}
(a, b) => b.open.flatMap {
_.receive1 { (firstB, b) =>
a.open.flatMap { a =>
a.await1Async.flatMap { a =>
b.await1Async.flatMap { b =>
go(firstB, a, b)
}}
}
}
}.close
}
defined function lastB
@ val a = Stream.range(0, 10).zipWith(time.awakeEvery[Task](500.millis))((a, _) => a)
a: Stream[Task, Int] = evalScope(<scope>).flatMap(<function1>)
@ val b = Stream.range(0, 10).map(i => ('a' + i).toChar).zipWith(time.awakeEvery[Task](300.millis))((b, _) => b)
b: Stream[Task, Char] = evalScope(<scope>).flatMap(<function1>)
@ a.through2(b)(lastB).evalMap(out => Task.delay(println(s"${System.currentTimeMillis} > $out"))).run.unsafeRun
1474981642567 > (0,b)
1474981643068 > (1,d)
1474981643565 > (2,f)
1474981644070 > (3,g)
1474981644566 > (4,i)
1474981645066 > (5,j)
1474981645570 > (6,j)
1474981646068 > (7,j)
1474981646566 > (8,j)
1474981647068 > (9,j)
def lastB[F[_]: Async, A, B](init: B): Pipe2[F, A, B, (A, B)] = (as, bs) =>
bs.uncons1.flatMap {
case None => Stream.empty
case Some((init, bs)) =>
as.either(bs).
mapAccumulate(init) {
case (b, Left(a)) => (b, Some(a))
case (_, Right(b)) => (b, None)
}.collect { case (b, Some(a)) => (a, b) }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment