Last active
January 14, 2023 21:09
-
-
Save gatorcse/1f92aa7e52a04d9c91511ca79f73e911 to your computer and use it in GitHub Desktop.
Merging two already sorted fs2 streams, with a sortBy function.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats._ | |
import cats.implicits._ | |
import cats.effect._ | |
import cats.effect.implicits._ | |
import fs2._ | |
class StreamMerger[F[_]] { | |
def priorityOrderBy[A, B: Order](s1: Stream[F, A], s2: Stream[F, A])(f: A => B): Stream[F, A] = { | |
def go(p1: Stream.StepLeg[F, A], p2: Stream.StepLeg[F, A]): Pull[F, A, Unit] = { | |
val (elems, leftOrRight) = chunkMergeByPriority(p1.head.toList, p2.head.toList)(f) | |
Pull.output(Chunk.seq(elems)) >> (leftOrRight match { | |
case Left(remaining) => | |
val p1next = p1.setHead(Chunk.seq(remaining)) | |
p2.stepLeg.flatMap { | |
case None => Pull.output(p1next.head) >> p1next.stream.pull.echo | |
case Some(p2next) => go(p1next, p2next) | |
} | |
case Right(remaining) => | |
val p2next = p2.setHead(Chunk.seq(remaining)) | |
p1.stepLeg.flatMap { | |
case None => Pull.output(p2next.head) >> p2next.stream.pull.echo | |
case Some(p1next) => go(p1next, p2next) | |
} | |
}) | |
} | |
s1.pull.stepLeg.flatMap { | |
case None => s2.pull.echo | |
case Some(leg1) => s2.pull.stepLeg.flatMap { | |
case None => Pull.output(leg1.head) >> leg1.stream.pull.echo | |
case Some(leg2) => go(leg1, leg2) | |
} | |
}.stream | |
} | |
def chunkMergeByPriority[A, B: Order](c1: List[A], c2: List[A])(f: A => B): (List[A], Either[List[A], List[A]]) = (c1, c2) match { | |
case (c1head :: c1tail, c2head :: _) if (Order[B].compare(f(c1head), f(c2head)) < 0) => | |
val (emits, remaining) = chunkMergeByPriority(c1tail, c2)(f) | |
(c1head :: emits, remaining) | |
case (_ :: _, c2head :: c2tail) => | |
val (emits, remaining) = chunkMergeByPriority(c1, c2tail)(f) | |
(c2head :: emits, remaining) | |
case (Nil, c2i) => Nil -> Either.right(c2i) | |
case (c1i, Nil) => Nil -> Either.left(c1i) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment