Last active
September 18, 2025 19:49
-
-
Save calvinlfer/55f2106cf461ed0ff07dcf342f9c7431 to your computer and use it in GitHub Desktop.
An example of how to abstractly model a large upstream and write to multiple downstreams (one at a time) using FS2 Streams.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.experiments.cal | |
| import cats.effect.* | |
| import cats.effect.std.* | |
| import cats.effect.syntax.all.* | |
| import cats.syntax.all.* | |
| import fs2.* | |
| import fs2.io.file.{Files, Flags} | |
| opaque type Prefix <: String = String | |
| object Prefix: | |
| def apply(value: String): Prefix = value | |
| opaque type Index <: Long = Long | |
| object Index: | |
| def apply(value: Long): Index = value | |
| opaque type Suffix <: String = String | |
| object Suffix: | |
| def apply(value: String): Suffix = value | |
| final class RotatingSink[F[_]: Concurrent, A]( | |
| filePrefix: Prefix, | |
| fileSuffix: Suffix, | |
| limit: Int, | |
| downstream: (Prefix, Index, Suffix) => Pipe[F, A, Unit] | |
| ): | |
| private def createConnectedQueue( | |
| streamIndex: Int | |
| ): F[(Fiber[F, Throwable, Unit], Queue[F, Option[Chunk[A]]])] = | |
| Queue | |
| .bounded[F, Option[Chunk[A]]](capacity = (limit * 2).toInt) | |
| .flatMap { q => | |
| val newPipe = downstream(filePrefix, Index(streamIndex), fileSuffix) | |
| val stream = Stream.fromQueueNoneTerminatedChunk(q).through(newPipe) | |
| stream.compile.drain.start.map(_ -> q) | |
| } | |
| private def go( | |
| rem: Stream[F, A], | |
| q: Queue[F, Option[Chunk[A]]], | |
| backgroundProcess: Fiber[F, Throwable, Unit], | |
| counter: Long, | |
| streamIndex: Int | |
| ): Pull[F, A, Unit] = | |
| // inspired by https://github.com/typelevel/fs2/blob/fdaae8959ad5d64fa0d30d78d9821897e7148bcf/io/shared/src/main/scala/fs2/io/file/Files.scala#L557 | |
| val toWrite = (limit - counter).max(Int.MaxValue.toLong).toInt | |
| rem.pull.unconsLimit(toWrite).flatMap { | |
| case None => | |
| // upstream is done, clean up | |
| Pull.eval(q.offer(None)) >> Pull.eval(backgroundProcess.join) >> Pull.done | |
| case Some((incoming, rem)) => | |
| val updatedCounter = counter + incoming.size | |
| if updatedCounter >= limit then | |
| // rotate | |
| val updatedStreamIndex = streamIndex + 1 | |
| Pull.eval(q.offer(None)) >> | |
| Pull.eval(backgroundProcess.join) >> | |
| Pull | |
| .eval(createConnectedQueue(updatedStreamIndex)) | |
| .flatMap { (bp, q) => | |
| Pull.eval(q.offer(Some(incoming))) >> | |
| go(rem, q, bp, incoming.size, updatedStreamIndex) | |
| } | |
| else | |
| // keep going | |
| Pull.eval(q.offer(Some(incoming))) >> go(rem, q, backgroundProcess, updatedCounter, streamIndex) | |
| } | |
| def sink: Pipe[F, A, Unit] = incomingStream => | |
| Stream | |
| .eval(createConnectedQueue(0)) | |
| .flatMap: (bp, q) => | |
| go(incomingStream, q, bp, 0, 0).stream.drain | |
| private def debug[A](msg: String): Pull[F, A, Unit] = | |
| Pull.eval((println(msg).pure[F])) |
Author
calvinlfer
commented
Sep 18, 2025
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment