Disclaimer: Firstly, I'm not an expert in any of these libraries. I'm writing this from a position of sharing what I have learnt so far, but also hoping to be criticised and learn a lot more in the process.
Both fs2 and zio-streams document the need to make use of "Chunking". Here is what the zio-streams docs say:
Every time we are working with streams, we are always working with chunks. There are no streams with individual elements, these streams have always chunks in their underlying implementation. So every time we evaluate a stream, when we pull an element out of a stream, we are actually pulling out a chunk of elements.
So why streams are designed in this way? This is because of the efficiency and performance issues. Every I/O operation in the programming world works with batches. We never work with a single element.
While this is true that IO operations work with batches, from a programmers perspective it is sometimes easier to reason about elements individually. I'll go into an example shortly. Sure when reading a http request I don't want a stream of individual bytes, but when processing records from a Kafka topic I feel it is perfectly reasonable to want to consider records in isolation.
Also, as I've documented before, both fs2 and zio-streams expose operations on the stream which break chunking and can massively degrade performance.
I loved Adam Fraser's excellent talk The Next Generation of Streaming and was really hopeful that was going to improve some of my performance concerns, but unfortunately I didn't observe any improvements around small chunk sizes with synchronous transformations (I'm working with Kafka and a lot of calls to produce don't result in network calls, just adding to an internal buffer, so there isn't the overhead of lots of network calls to hide away the CPU overheads of the streaming implementation).
For our example application lets take a Kafka consumer. The consumer reads messages, we can imagine the kafka topic looking like:
[DB('hello'),ForwardToKafka('world'),DB('blah'),ForwardToKafka('yeah'),.....]
Depending on the inbound message, we need to make a processing decision. If we were not concerned with Chunking, we could just do something approximately like (not real code, just conveying an idea):
myKafkaStream.mapZIOPar(100) { consumerRecord =>
val processingLogic: Task[UpdateResult] = consumerRecord.value match {
case DB(value) => writeToDb(value)
case ForwardToKafka(value) => zioKafkaProducer.produce(value)
}
processingLogic.map(updateResult => updateResult -> consumerRecord)
}
Note that we return some details about the processing and the original consumerRecord as further downstream processing may need it for other processing (and the Kafka logic requires it for committing back offsets). The mapZIOPar(100)
is because the kafka producer will block on each record produced, but internally the producer will batch records together to send (ie many calls to zioKafkaProducer.produce
result in one call to the brokers. Anyway, the focus of this post is not on Kafka batching, but I just want to highlight that this discussion is not focussed on Kafka batching for effiency, rather the lack of efficiency when a zio-stream is used without chunking.
At this point I'd say the program is very easy to understand.
But, zio-kafka is implemented with zio-streams. And as already stated, this will be slow unless chunking is used. So that call zioKafkaProducer.produce(value)
is not going to perform well. For zio-kafka to perform well, we need to use the zioKafkaProducer.produceChunk...
variants.
So our program needs to be modified. Here is another attempt:
myKafkaStream.mapZioChunks { consumerRecordChunk =>
// Firstly we need to split the Chunk into the different processing units. Note when splitting we also need an index associated with each element so we can construct the correct order again later.
val partitionedChunks: (Chunk[(Index,DB)], Chunk[(Index,ForwardToKafka)]) = ??? // partitioning logic
// handle the DB updates
val dbTask = writeToDBChunked(partitionedChunks._1.map(_._2)) // writing to the DB doesn't care about the Index, just wants the data to write
val kafkaTask = zioKafkaProducer.produceChunks(partitionedChunks._2.map(_._2)) // again, writing to kafka needs the Chunk, but without the Index
for {
dbResults <- dbTask
kafkaResults <- kafkaTask
} yield mergeResultsWithOriginalConsumerRecordsPreservingOrdering(....) // Needs
}
This is nowhere near as easy to reason about. While "Every I/O operation in the programming world works with batches." is true, this doesn't mean it is always the easiest way to reason about a program.
I work in a company where developers work in a variety of languages. They will not all be experts in ZIO and zio-streams and I need them to be able to quickly express applications in the simplest form, not to have to reason about different performance semantics depending on how their appliation is structured.
So back to the initial point, why do FS2 / zio-streams have this requirement to have streams backed by chunks, but Monix and Akka streams are amazingly fast working with individual elements?
Now, I don't know for certain, but my best guess is this.
Monix represents the contract between stream operations with this trait:
trait Observer[-A] extends Any with Serializable {
def onNext(elem: A): Future[Ack]
def onError(ex: Throwable): Unit
def onComplete(): Unit
}
And Akka streams has a similar approach.
Both fs2 and zio-streams model all the internal transformations on a stream as pure values. Here is a small snippet of some of the stream transformations from ZIO.
sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDone] {
private[stream] final case class Emit[OutElem](override val trace: Trace, elem: OutElem)
extends ZChannel[Any, Any, Any, Any, Nothing, OutElem, Unit]
private[stream] final case class Fail[OutErr](err: Cause[OutErr])
extends ZChannel[Any, Any, Any, Any, OutErr, Nothing, Nothing]
private[stream] final case class Fold[Env, InErr, InElem, InDone, OutErr, OutErr2, OutElem, OutDone, OutDone2](
override val trace: Trace,
channel: ZChannel[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone],
onErr: Cause[OutErr] => ZChannel[Env, InErr, InElem, InDone, OutErr2, OutElem, OutDone2],
onDone: OutDone => ZChannel[Env, InErr, InElem, InDone, OutErr2, OutElem, OutDone2]
) extends ZChannel[Env, InErr, InElem, InDone, OutErr2, OutElem, OutDone2]
So every element that is pushed through the stream involves allocating objects to represent the transformation and something interpretting the actions to be done on the stream.
Here are two implementations of summing a stream of 10 million elements in Monix and zio-streams.
Monix:
val max = 10_000_000
val source: Observable[Long] = Observable.range(from = 0, until = max, step = 1)
val consumeWithFold: Task[Unit] = {
source.foldLeftL(0L){case (sum, elem) => elem + sum }.void
}
ZIO:
val max = 10_000_000
val source: ZStream[Any, Nothing, Int] = ZStream.range(min = 0, max = max, chunkSize = 1)
val consumeWithFold: UIO[Long] = {
source.runFold(0L){case (sum, elem) => sum + elem}
}
On my laptop, the monix implementation takes 85ms on average and the ZIO implementation takes 1447ms. That is a pretty big difference (17x).
When profiling the applications, the Monix one results in approximately 50MB of memory allocations,the ZIO one allocates 6.2GB. This time an even bigger difference (124x).
FS2 takes 3409ms and allocates 15.5GB.
Is this fundamental difference of an API contract for stream communication vs an expressive set of values the reason why Akka / Monix can perform well without chunking while FS2 / zio-streams cannot? I honestly don't know. The monix implementation of map
on a stream just calls the mapping function then calls onNext
. In the zio-streams world, I believe this requires the upstream returning a value to express a value should be emitted, and the downstream also returning a value to express it wants a value, and some coordinator between those two things interpretting that and passing the value between the upstream and downstream. Maybe I just need to accept that fs2 / zio-streams will never be as fast as akka streams / monix because of that model.
I can understand why Akka is less popular than FS2 / zio-streams, because the API deals with scala.concurrent.Future
which isn't as nice to work with.
But given Monix Observable had a nice public API, and given how well Monix Observable performs, I'm curious why it has lost the battle for adoption against FS2 and ZIO-streams. As a user I don't really care if the internal implementation is pure, I'd rather the exposed API was performant for all operations.
@lukestephenson I like Monix too, would you like to help maintain Monix?