I'm a long time user of reactive streams in Scala. Originally with akka-streams, and then Monix Observable.
The transition from akka streams to Monix Observable I found pretty straight forward. I was motivated by wanting to use a lazy effect system, rather than working with scala.concurrent.Future
.
More recently I've been considering fs2 and Zio Streams as an alternative to Monix Observable. This has largely been motivated by the fact that Monix doesn't have much ongoing development and is stuck on cats-effect 2.
I've not found the transition from Monix Observable to Zio Streams as easy as my earlier transition (from akka -> monix). While the ergonomics of the API are similar, performance characterics differ hugely.
Both fs2 and zio-streams document that for efficient usage, the underling stream needs to be chunked. Note this is a huge difference to Monix Obserable and Akka Streams in my experience. Both of these perform extremely well without chunking.
Most of the work I do centers around using a reactive stream to represent processing a stream of events (from Kafka or a database). But for the purposes of this, I've just built a random Stream with ZIO.
val source = ZStream
.range(0, 10000)
.groupedWithin(5000, 5.milliseconds)
.mapZIO { messages =>
ZIO.succeed(messages.sum)
}
.runSum
.timed
.map { case (duration, result) => println(s"took ${duration.toMillis}ms to calculate $result") }
All this does is make use of some operations on the ZStream to sum the elements. It is contrived and there are simplier solutions.
Now lets say that I have a new requirement to record a metric for each element that arrives on the Stream. I take a look at the ZStream
api and see it offers a tap
method to perform a side effect for each element of the stream.
So I change the solution above to:
// For simplicity, lets just assume our metrics implementation records to this var
var counter = 0
val source = ZStream
.range(0, 10000)
.tap(message => ZIO.succeed {
counter = counter + 1
})
.groupedWithin(5000, 5.milliseconds)
// as before
This works, but in doing so I've added about 700ms of processing time. Nothing scientific here in how the timing was recorded, just how long it took on my macbook pro.
700ms is a huge amount of overhead. I've currently got some Kafka pipelines with akka streams / monix observable handling 20,000 messages per second. I've not even introduced network IO yet and it can't keep up.
Now, instead of using tap
, I can use mapChunks
.
That is:
val source = ZStream
.range(0, 10000)
// replaced tap here with mapChunksZIO
.mapChunksZIO { chunk =>
ZIO.succeed {
counter = counter + chunk.size
chunk
}
}
.groupedWithin(5000, 5.milliseconds)
If I do this, then recording the "metrics" has no noticable impact on performance. Note that ZStream.range is creating a Stream with 4096 element chunks, so mapChunksZIO
is called 3 times.
Also note that in the previous implementation using tap
, after calling tap
the ZStream
is now backed by chunks of size 1. So every subsequent operation is slow.
I question if the ZStream API is fit for purpose if there is a large portion of the API that if used can severely degrade performance. Should all of the operations which can degrade performance be named appropriately? Should tap
be renamed to slowTap
? Or should all the slow methods be moved into an extension ZStreamSlowAPI
so that it is clear when you are using a method that may impact performance.
With Monix Observable / Akka Streams, if there is an API exposed, I assume I can use it safely without it degrading performance. The same assumption doesn't hold true for fs2 and Zio Streams. Given all the hype around Zio performance, I came in with high expectations. So far I still prefer working with Monix Observable.