Created
November 30, 2023 18:01
-
-
Save kamilkloch/aee71dc16855058ce79e3d7ee699b199 to your computer and use it in GitHub Desktop.
fs2 Stream merge performance vs raw IOs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.std.Queue | |
import cats.effect.{IO, IOApp} | |
import fs2._ | |
object StreamMergePerformanceTests extends IOApp.Simple { | |
def run: IO[Unit] = { | |
case class Payload(x: Integer) | |
Queue.bounded[IO, Payload](2).flatMap { q => | |
val payload = Payload(123) | |
val n = 5000 | |
val m = 50 | |
val consumer = Stream.fromQueueUnterminated(q).drain | |
//val consumer = Stream.repeatEval(q.take) | |
val producer = Stream.exec(q.offer(payload)).repeatN(n) | |
//val producer = Stream.repeatEval(q.offer(payload)).take(n) | |
val testStreams = producer.concurrently(consumer).compile.drain | |
val testStreamsCompiled = consumer.compile.drain.background.use(_ => producer.compile.drain) | |
val testQueues = q.take.foreverM.background.use(_ => q.offer(payload).replicateA_(n)) | |
IO.print("Concurrent drained streams: ") >> | |
testStreams.replicateA_(m) >> | |
testStreams.timed.map(_._1.toMillis).replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(IO.println) >> | |
IO.print("Drained streams, compiled, concurrently as IOs: ") >> | |
testStreamsCompiled.replicateA_(m) >> | |
testStreamsCompiled.timed.map(_._1.toMillis).replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(IO.println) >> | |
IO.print("Concurrent Queue#offer and Queue#take: ") >> | |
testQueues.replicateA_(m) >> | |
testQueues.timed.map(_._1.toMillis).replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(IO.println) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Concurrent drained streams: 64.54
Drained streams, compiled, concurrently as IOs: 6.24
Concurrent Queue#offer and Queue#take: 1.42