Skip to content

Instantly share code, notes, and snippets.

@SystemFw
Last active May 24, 2023 15:13
Show Gist options
  • Save SystemFw/e942dac6264fc0cc48711affb03666b3 to your computer and use it in GitHub Desktop.
Save SystemFw/e942dac6264fc0cc48711affb03666b3 to your computer and use it in GitHub Desktop.
Running fs2 streams in parallel and collect their result in sequence, with queues
object Example {
import cats._, implicits._
import cats.effect._
import fs2._
import scala.concurrent.ExecutionContext
// start N streams concurrently, and stream their results in order
// e.g. download a file from a server in N parts concurrently, and stream it
abstract class Channel[F[_], A] {
def send: Sink[F, A]
def receive: Stream[F, A]
}
object Channel {
def create[F[_]: Effect, A](
implicit ec: ExecutionContext): F[Channel[F, A]] =
async.boundedQueue[F, Option[A]](1000) map { queue =>
new Channel[F, A] {
def send: Sink[F, A] =
_.noneTerminate
.to(queue.enqueue)
.onError(_ => Stream.eval(queue.enqueue1(None)))
def receive: Stream[F, A] =
queue.dequeue.unNoneTerminate
}
}
}
def orderedJoin[F[_]: Effect, A](streams: Stream[F, Stream[F, A]])(
implicit ec: ExecutionContext): Stream[F, A] = {
type State = (Stream[F, Channel[F,A]], Stream[F, Stream[F, Unit]])
def channelsAndProducers: Stream[F, State] =
streams.evalMap { producer =>
Channel.create[F, A] map { chan =>
Stream(chan).covary[F] -> Stream(producer to chan.send).covary[F]
}
}.foldMonoid
def consumer(chans: Stream[F,Channel[F, A]]): Stream[F, A] =
chans.flatMap(_.receive)
channelsAndProducers flatMap { case (chans, producers) =>
consumer(chans) concurrently producers.join(Int.MaxValue)
}
}
}
object Test {
import cats._, implicits._
import cats.effect._
import fs2._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import example._
def stdOut[F[_], I](implicit F: Sync[F]): Sink[F, I] =
_.map(_.toString).to(_.evalMap(str => F.delay(Console.out.println(str))))
def producers = {
def init = Stream.eval(IO(println("Initiating producers")))
def prods = init >> Stream.range(0, 20).covary[IO]
def elems = Stream.range(0, 20).covary[IO]
def msg(p: Int, i: Int) = s"Producer no: $p, element no: $i"
prods.map(p => elems.map(i => msg(p,i)).observe(stdOut))
}
def result =
orderedJoin(producers).map(s => "Received from" + s).observe(stdOut)
def run = result.run.unsafeRunSync
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment