Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Last active September 18, 2025 19:49
Show Gist options
  • Save calvinlfer/55f2106cf461ed0ff07dcf342f9c7431 to your computer and use it in GitHub Desktop.
Save calvinlfer/55f2106cf461ed0ff07dcf342f9c7431 to your computer and use it in GitHub Desktop.
An example of how to abstractly model a large upstream and write to multiple downstreams (one at a time) using FS2 Streams.
package com.experiments.cal
import cats.effect.*
import cats.effect.std.*
import cats.effect.syntax.all.*
import cats.syntax.all.*
import fs2.*
import fs2.io.file.{Files, Flags}
opaque type Prefix <: String = String
object Prefix:
def apply(value: String): Prefix = value
opaque type Index <: Long = Long
object Index:
def apply(value: Long): Index = value
opaque type Suffix <: String = String
object Suffix:
def apply(value: String): Suffix = value
final class RotatingSink[F[_]: Concurrent, A](
filePrefix: Prefix,
fileSuffix: Suffix,
limit: Int,
downstream: (Prefix, Index, Suffix) => Pipe[F, A, Unit]
):
private def createConnectedQueue(
streamIndex: Int
): F[(Fiber[F, Throwable, Unit], Queue[F, Option[Chunk[A]]])] =
Queue
.bounded[F, Option[Chunk[A]]](capacity = (limit * 2).toInt)
.flatMap { q =>
val newPipe = downstream(filePrefix, Index(streamIndex), fileSuffix)
val stream = Stream.fromQueueNoneTerminatedChunk(q).through(newPipe)
stream.compile.drain.start.map(_ -> q)
}
private def go(
rem: Stream[F, A],
q: Queue[F, Option[Chunk[A]]],
backgroundProcess: Fiber[F, Throwable, Unit],
counter: Long,
streamIndex: Int
): Pull[F, A, Unit] =
// inspired by https://github.com/typelevel/fs2/blob/fdaae8959ad5d64fa0d30d78d9821897e7148bcf/io/shared/src/main/scala/fs2/io/file/Files.scala#L557
val toWrite = (limit - counter).max(Int.MaxValue.toLong).toInt
rem.pull.unconsLimit(toWrite).flatMap {
case None =>
// upstream is done, clean up
Pull.eval(q.offer(None)) >> Pull.eval(backgroundProcess.join) >> Pull.done
case Some((incoming, rem)) =>
val updatedCounter = counter + incoming.size
if updatedCounter >= limit then
// rotate
val updatedStreamIndex = streamIndex + 1
Pull.eval(q.offer(None)) >>
Pull.eval(backgroundProcess.join) >>
Pull
.eval(createConnectedQueue(updatedStreamIndex))
.flatMap { (bp, q) =>
Pull.eval(q.offer(Some(incoming))) >>
go(rem, q, bp, incoming.size, updatedStreamIndex)
}
else
// keep going
Pull.eval(q.offer(Some(incoming))) >> go(rem, q, backgroundProcess, updatedCounter, streamIndex)
}
def sink: Pipe[F, A, Unit] = incomingStream =>
Stream
.eval(createConnectedQueue(0))
.flatMap: (bp, q) =>
go(incomingStream, q, bp, 0, 0).stream.drain
private def debug[A](msg: String): Pull[F, A, Unit] =
Pull.eval((println(msg).pure[F]))
@calvinlfer
Copy link
Author

object Example2 extends IOApp.Simple {
  // https://www.kaggle.com/datasets/dhruvildave/en-fr-translation-dataset?resource=download
  val path = fs2.io.file.Path("./en-fr.csv")

  val downstreamWriter: (Prefix, Index, Suffix) => Pipe[IO, String, Unit] =
    (prefix, index, suffix) =>
      input =>
        val filePath = s"$prefix$index$suffix"
        input
          .intersperse(java.lang.System.lineSeparator())
          .through(fs2.text.utf8.encode[IO])
          .through(Files[IO].writeAll(fs2.io.file.Path(filePath)))

  val run =
    val chunkSize = 1_024_000
    Files[IO]
          .readAll(path)
          .through(fs2.text.utf8.decode)
          .through(fs2.text.lines)
          .through(
            RotatedPipe2(
              filePrefix = Prefix("out/en-fr-"),
              fileSuffix = Suffix(".csv"),
              limit = chunkSize,
              downstream = downstreamWriter
            ).sink
          )

      }
      .compile
      .drain
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment