Skip to content

Instantly share code, notes, and snippets.

@davidpeklak
Last active August 29, 2015 13:56
Show Gist options
  • Save davidpeklak/9161774 to your computer and use it in GitHub Desktop.
Save davidpeklak/9161774 to your computer and use it in GitHub Desktop.
scalaz stream matchTee
import scalaz.concurrent.Task
import scalaz.stream._
import scalaz.stream.Process._
import scalaz.stream.Process.Emit
import scalaz.stream.Tee
object MatchTee {
def matchTee[I: Ordering]: Tee[I, I, (Option[I], Option[I])] = {
val ord = implicitly[Ordering[I]]
def go(cl: I, cr: I): Tee[I, I, (Option[I], Option[I])] = {
if (ord.lt(cl, cr)) {
Emit(List((Some(cl), None)), receiveLOr[I, I, (Option[I], Option[I])](emit((None, Some(cr))))(l => go(l, cr)))
}
else if (ord.gt(cl, cr)) {
Emit(List((None, Some(cr))), receiveROr[I, I, (Option[I], Option[I])](emit((Some(cl), None)))(r => go(cl, r)))
}
else Emit(List((Some(cl), Some(cr))), for {
lo <- receiveLOr[I, I, Option[I]](emit(None))(l => emit(Some(l)))
ro <- receiveROr[I, I, Option[I]](emit(None))(l => emit(Some(l)))
e <- (lo, ro) match {
case (Some(l), Some(r)) => go(l ,r)
case _ => emit((lo, ro))
}
} yield e)
}
(for {
l <- awaitL[I]
r <- awaitR[I]
e <- go(l, r)
} yield e) ++ awaitL[I].map(l => (Some(l), None)).repeat ++ awaitR[I].map(l => (None, Some(l))).repeat
}
val e1: Process[Task, Int] = emitAll(Seq(1, 3, 5, 7, 9, 11, 12, 13))
val e2: Process[Task, Int] = emitAll(Seq(1, 2, 3, 6, 8, 10))
val t = (e1 tee e2)(matchTee)
val tp: Process[Task, Unit] = t.map(_.toString).to(io.stdOutLines)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment