Skip to content

Instantly share code, notes, and snippets.

@ChristopherDavenport
Created July 9, 2020 17:23
Show Gist options
  • Select an option

  • Save ChristopherDavenport/a035bbff68d2ef541d498df69d2751db to your computer and use it in GitHub Desktop.

Select an option

Save ChristopherDavenport/a035bbff68d2ef541d498df69d2751db to your computer and use it in GitHub Desktop.
Pure Pull/Creek Invariant Implementation
object Streaming {
sealed trait Pull[O, R]{ self =>
protected def step: Eval[Either[R, (O, Pull[O, R])]]
def map[R2](f: R => R2): Pull[O, R2] = flatMap(r => Pull.pure(f(r)))
def flatMap[R2](f: R => Pull[O, R2]): Pull[O, R2] = new Pull[O, R2]{
protected def step: Eval[Either[R2, (O, Pull[O, R2])]] = {
Eval.defer(self.step).flatMap{
case Right((hd, tl)) =>
val out : (O,Pull[O, R2]) = (hd, tl.flatMap(f))
Eval.now(Right(out))
case Left(r) => f(r).step
}
}
}
def flatMapOutput[O2](f: O => Pull[O2, Unit])(implicit ev: R =:= Unit) = new Pull[O2, Unit]{
def step: Eval[Either[Unit,(O2, Pull[O2,Unit])]] =
Eval.defer(self.step).flatMap{
case Left(_) => Eval.now(Left(()))
case Right((hd, tl)) => f(hd).step
}
}
def fold[A](init: A)(f: (A, O) => A): (R, A) = {
def go(init: A, p: Pull[O, R]): Eval[(R, A)] =
p.step.flatMap {
case Right((hd, tl)) => go(f(init, hd), tl)
case Left(r) => Eval.now((r, init))
}
go(init, this).value
}
def repeat: Pull[O, R] = flatMap(_ => repeat)
def take(n: Int): Pull[O, Option[R]] = {
def go(n: Int, p: Pull[O, R]): Pull[O, Option[R]] =
if (n <= 0) Pull.pure(None)
else
p.uncons.flatMap{
case Right((hd, tl)) => Pull.output(hd) >> go(n - 1, tl)
case Left(r) => Pull.pure(Some(r))
}
go(n, self)
}
def uncons : Pull[O, Either[R, (O, Pull[O, R])]] = Pull.pure(self.step.value)
}
object Pull {
def flatMapOutput[O, O2](p: Pull[O, Unit], f: O => Pull[O2, Unit]): Pull[O2, Unit] =
p.flatMapOutput(f)
def pure[O, R](r: R): Pull[O, R] = new Result(r)
private class Result[O, R](r: R) extends Pull[O, R] {
protected def step = Eval.now(Left(r))
}
def output[O](o: O): Pull[O, Unit] = new Pull[O, Unit]{
protected def step = Eval.now(Right((o, done)))
}
def done[O]: Pull[O, Unit] = pure(())
implicit def monadInstance[O]: Monad[Pull[O, *]] =
new Monad[Pull[O, ?]] {
def pure[A](a: A) = Pull.pure(a)
def flatMap[A, B](fa: Pull[O, A])(f: A => Pull[O, B]) = fa.flatMap(f)
def tailRecM[A, B](a: A)(f: A => Pull[O, Either[A, B]]) =
f(a).flatMap {
case Left(a) => tailRecM(a)(f)
case Right(b) => pure(b)
}
}
}
final class Creek[O](private val pull: Pull[O, Unit]){
def flatMap[O2](f: O => Creek[O2]): Creek[O2] = new Creek(
pull.flatMapOutput(f.andThen(_.pull))
)
def repeat: Creek[O] = new Creek(pull.repeat)
def take(n: Int) : Creek[O] = new Creek(pull.take(n).map(_ => ()))
def fold[A](init: A)(f: (A, O) => A): A =
pull.fold(init)(f)._2
def to(collector: fs2.Collector[O]): collector.Out =
fold(collector.newBuilder){(bldr, c) => bldr += Chunk.singleton(c); bldr}.result
}
object Creek {
def apply[A](elems: A*): Creek[A] = new Creek[A]({
def listPull(l: List[A]): Pull[A, Unit] = l match {
case head :: next => Pull.output(head) >> listPull(next)
case Nil => Pull.done
}
listPull(elems.toList)
})
def pure[A](a: A): Creek[A] = new Creek(Pull.output(a))
def unfold[A](a: A)(f: A => Option[A]) = new Creek({
def pull(a: A): Pull[A, Unit] = Pull.output(a) >> {
f(a) match {
case None => Pull.done
case Some(a) => pull(a)
}
}
pull(a)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment