Created
July 9, 2020 17:23
-
-
Save ChristopherDavenport/a035bbff68d2ef541d498df69d2751db to your computer and use it in GitHub Desktop.
Pure Pull/Creek Invariant Implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| object Streaming { | |
| sealed trait Pull[O, R]{ self => | |
| protected def step: Eval[Either[R, (O, Pull[O, R])]] | |
| def map[R2](f: R => R2): Pull[O, R2] = flatMap(r => Pull.pure(f(r))) | |
| def flatMap[R2](f: R => Pull[O, R2]): Pull[O, R2] = new Pull[O, R2]{ | |
| protected def step: Eval[Either[R2, (O, Pull[O, R2])]] = { | |
| Eval.defer(self.step).flatMap{ | |
| case Right((hd, tl)) => | |
| val out : (O,Pull[O, R2]) = (hd, tl.flatMap(f)) | |
| Eval.now(Right(out)) | |
| case Left(r) => f(r).step | |
| } | |
| } | |
| } | |
| def flatMapOutput[O2](f: O => Pull[O2, Unit])(implicit ev: R =:= Unit) = new Pull[O2, Unit]{ | |
| def step: Eval[Either[Unit,(O2, Pull[O2,Unit])]] = | |
| Eval.defer(self.step).flatMap{ | |
| case Left(_) => Eval.now(Left(())) | |
| case Right((hd, tl)) => f(hd).step | |
| } | |
| } | |
| def fold[A](init: A)(f: (A, O) => A): (R, A) = { | |
| def go(init: A, p: Pull[O, R]): Eval[(R, A)] = | |
| p.step.flatMap { | |
| case Right((hd, tl)) => go(f(init, hd), tl) | |
| case Left(r) => Eval.now((r, init)) | |
| } | |
| go(init, this).value | |
| } | |
| def repeat: Pull[O, R] = flatMap(_ => repeat) | |
| def take(n: Int): Pull[O, Option[R]] = { | |
| def go(n: Int, p: Pull[O, R]): Pull[O, Option[R]] = | |
| if (n <= 0) Pull.pure(None) | |
| else | |
| p.uncons.flatMap{ | |
| case Right((hd, tl)) => Pull.output(hd) >> go(n - 1, tl) | |
| case Left(r) => Pull.pure(Some(r)) | |
| } | |
| go(n, self) | |
| } | |
| def uncons : Pull[O, Either[R, (O, Pull[O, R])]] = Pull.pure(self.step.value) | |
| } | |
| object Pull { | |
| def flatMapOutput[O, O2](p: Pull[O, Unit], f: O => Pull[O2, Unit]): Pull[O2, Unit] = | |
| p.flatMapOutput(f) | |
| def pure[O, R](r: R): Pull[O, R] = new Result(r) | |
| private class Result[O, R](r: R) extends Pull[O, R] { | |
| protected def step = Eval.now(Left(r)) | |
| } | |
| def output[O](o: O): Pull[O, Unit] = new Pull[O, Unit]{ | |
| protected def step = Eval.now(Right((o, done))) | |
| } | |
| def done[O]: Pull[O, Unit] = pure(()) | |
| implicit def monadInstance[O]: Monad[Pull[O, *]] = | |
| new Monad[Pull[O, ?]] { | |
| def pure[A](a: A) = Pull.pure(a) | |
| def flatMap[A, B](fa: Pull[O, A])(f: A => Pull[O, B]) = fa.flatMap(f) | |
| def tailRecM[A, B](a: A)(f: A => Pull[O, Either[A, B]]) = | |
| f(a).flatMap { | |
| case Left(a) => tailRecM(a)(f) | |
| case Right(b) => pure(b) | |
| } | |
| } | |
| } | |
| final class Creek[O](private val pull: Pull[O, Unit]){ | |
| def flatMap[O2](f: O => Creek[O2]): Creek[O2] = new Creek( | |
| pull.flatMapOutput(f.andThen(_.pull)) | |
| ) | |
| def repeat: Creek[O] = new Creek(pull.repeat) | |
| def take(n: Int) : Creek[O] = new Creek(pull.take(n).map(_ => ())) | |
| def fold[A](init: A)(f: (A, O) => A): A = | |
| pull.fold(init)(f)._2 | |
| def to(collector: fs2.Collector[O]): collector.Out = | |
| fold(collector.newBuilder){(bldr, c) => bldr += Chunk.singleton(c); bldr}.result | |
| } | |
| object Creek { | |
| def apply[A](elems: A*): Creek[A] = new Creek[A]({ | |
| def listPull(l: List[A]): Pull[A, Unit] = l match { | |
| case head :: next => Pull.output(head) >> listPull(next) | |
| case Nil => Pull.done | |
| } | |
| listPull(elems.toList) | |
| }) | |
| def pure[A](a: A): Creek[A] = new Creek(Pull.output(a)) | |
| def unfold[A](a: A)(f: A => Option[A]) = new Creek({ | |
| def pull(a: A): Pull[A, Unit] = Pull.output(a) >> { | |
| f(a) match { | |
| case None => Pull.done | |
| case Some(a) => pull(a) | |
| } | |
| } | |
| pull(a) | |
| }) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment