Skip to content

Instantly share code, notes, and snippets.

@kamilkloch
Last active December 7, 2023 12:08
Show Gist options
  • Save kamilkloch/d7672d1eefe7e32748f689d81ae309f2 to your computer and use it in GitHub Desktop.
Save kamilkloch/d7672d1eefe7e32748f689d81ae309f2 to your computer and use it in GitHub Desktop.
Performance of merging fs2 streams
object StreamMergeOverhead2 extends IOApp.Simple {
def run: IO[Unit] = {
IO.println("=== Stream(1).covary[IO].repeatN(10_000) ===") >>
benchmark(Stream(1).covary[IO].repeatN(10_000), numStreams = 2, m = 10) >>
IO.println("=== Stream.emits(List.fill(10_000)(1)).covary[IO].repeatN(10) ===") >>
benchmark(Stream.emits(List.fill(10_000)(1)).covary[IO].repeatN(10), numStreams = 2, m = 10)
}
def benchmark[O](s: Stream[IO, O], numStreams: Int, m: Int): IO[Unit] = {
val warmup = List.fill(numStreams)(s).reduceLeft(_.mergeHaltBoth(_)).compile.drain.replicateA_(m)
val merge = List.fill(numStreams)(s).reduceLeft(_.merge(_)).compile.drain.timed.map(_._1.toMillis)
.replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(x => IO.println(s"merge: $x"))
val parJoin = Stream(List.fill(numStreams)(s): _*).covary[IO].parJoinUnbounded.compile.drain.timed.map(_._1.toMillis)
.replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(x => IO.println(s"parJoin: $x"))
val joinViaChannel = Channel.bounded[IO, O](65536).flatMap { c =>
val produce = List.fill(numStreams)(s).parTraverse_(_.foreach(x => c.send(x).void).compile.drain)
c.stream.concurrently(Stream.exec(produce) ++ Stream.exec(c.close.void)).compile.drain.timed.map(_._1.toMillis)
}.replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(x => IO.println(s"joinViaChannel: $x"))
val joinChunksViaChannel = Channel.bounded[IO, Chunk[O]](65536).flatMap { c =>
val produce = List.fill(numStreams)(s).parTraverse_(_.chunks.foreach(x => c.send(x).void).compile.drain)
c.stream.unchunks.concurrently(Stream.exec(produce) ++ Stream.exec(c.close.void)).compile.drain.timed.map(_._1.toMillis)
}.replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(x => IO.println(s"joinChunksViaChannel: $x"))
val joinViaQueue = Queue.bounded[IO, O](16384).flatMap { q =>
IO.deferred[Either[Throwable, Unit]].flatMap { latch =>
val produce = List.fill(numStreams)(s).parTraverse_(_.foreach(q.offer).compile.drain)
Stream
.fromQueueUnterminated(q, 65536)
.interruptWhen(latch)
.concurrently(Stream.exec(produce >> latch.complete(Right(())).void)).compile.drain.timed.map(_._1.toMillis)
}
}.replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(x => IO.println(s"joinViaQueue: $x"))
val joinChunkViaQueue = Queue.bounded[IO, Chunk[O]](16384).flatMap { q =>
IO.deferred[Either[Throwable, Unit]].flatMap { latch =>
val produce = List.fill(numStreams)(s).parTraverse_(_.chunks.foreach(q.offer).compile.drain)
Stream
.fromQueueUnterminatedChunk(q)
.interruptWhen(latch)
.concurrently(Stream.exec(produce >> latch.complete(Right(())).void)).evalMap(_ => IO.unit).compile.drain.timed.map(_._1.toMillis)
}
}.replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(x => IO.println(s"joinChunksViaQueue: $x"))
warmup >>
joinViaChannel >> joinChunksViaChannel >>
joinViaQueue >> joinChunkViaQueue >>
merge >>
parJoin
}
}
@kamilkloch
Copy link
Author

kamilkloch commented Dec 6, 2023

=== Stream(1).covary[IO].repeatN(10_000) ===
joinViaChannel: 26.2
joinChunksViaChannel: 21.8
joinViaQueue: 11.1
joinChunksViaQueue: 18.8
merge: 1275.7
parJoin: 307.4

=== Stream.emits(List.fill(10_000)(1)).covary[IO].repeatN(10) ===
joinViaChannel: 49.8
joinChunksViaChannel: 0.0
joinViaQueue: 36.3
joinChunksViaQueue: 0.0
merge: 0.0
parJoin: 0.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment