Last active
August 29, 2015 14:23
-
-
Save pchiusano/7834d0725c1ad4332c86 to your computer and use it in GitHub Desktop.
Stream API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package streams | |
trait Free[+F[_],+A] { | |
import Free._ | |
def flatMap[F2[x]>:F[x],B](f: A => Free[F2,B]): Free[F2,B] = Bind(this, f) | |
def map[B](f: A => B): Free[F,B] = Bind(this, f andThen (Free.Pure(_))) | |
} | |
object Free { | |
case class Pure[A](a: A) extends Free[Nothing,A] | |
case class Eval[F[_],A](fa: F[A]) extends Free[F,A] | |
case class Bind[F[_],R,A](r: Free[F,R], f: R => Free[F,A]) extends Free[F,A] | |
} | |
trait Chunk[+A] | |
object Chunk { | |
def uncons[A](c: Chunk[A]): Option[(A, Chunk[A])] = ??? | |
def empty[A]: Chunk[A] = ??? | |
def singleton[A](a: A): Chunk[A] = ??? | |
def append[A](a1: Chunk[A], a2: Chunk[A]): Chunk[A] = ??? | |
def foldLeft[A,B](a: Chunk[A], z: B)(f: (B,A) => B): B = ??? | |
} | |
case class Step[+A,+B](head: A, tail: B) | |
trait Monad[F[_]] { | |
def map[A,B](a: F[A])(f: A => B): F[B] = bind(a)(f andThen (pure)) | |
def bind[A,B](a: F[A])(f: A => F[B]): F[B] | |
def pure[A](a: A): F[A] | |
} | |
trait Async[F[_]] extends Monad[F] { | |
type Pool[A] | |
/** Create a asynchronous, concurrent pool. */ | |
def pool[A]: F[Pool[A]] | |
/** | |
* Add a task to a `Pool`. After the returned `F[Unit]` is bound, the | |
* task is running in the background. Multiple tasks may be added to a | |
* `Pool[A]`. | |
*/ | |
def put[A](q: Pool[A])(a: F[A]): F[Unit] | |
def putFree[A](q: Pool[A])(a: Free[F,A]): F[Unit] | |
/** | |
* Obtain the first available result from the `Pool`. | |
*/ | |
def take[A](q: Pool[A]): F[A] | |
/** | |
* Like `take`, but does not consume the value from the `Pool`. | |
*/ | |
def peek[A](q: Pool[A]): F[A] = | |
// instances may have more efficient implementation | |
bind(take(q))(a => map(put(q)(pure(a)))(_ => a)) | |
def race[A,B](a: F[A], a2: F[B]): F[Either[A,B]] | |
} | |
trait Stream[P[+_[_],+_]] { | |
def emits[F[_],A](as: Chunk[A]): P[F,A] | |
def emit[F[_],A](a: A): P[F,A] = emits(Chunk.singleton(a)) | |
def empty[A]: P[Nothing,A] = emits(Chunk.empty) | |
def append[F[_],A](a: P[F,A], b: => P[F,A]): P[F,A] | |
def flatMap[F[_],A,B](a: P[F,A])(f: A => P[F,B]): P[F,B] | |
def map[F[_],A,B](a: P[F,A])(f: A => B): P[F,B] = | |
flatMap(a)(f andThen (emit)) | |
def available[F[_],A](p: P[F, A]): P[F, Step[Chunk[A], P[F,A]]] | |
def availableAsync[F[_]:Async,A](p: P[F, A]): P[F, F[P[F, Step[Chunk[A], P[F,A]]]]] | |
def force[F[_],A](f: F[P[F, A]]): P[F,A] = | |
flatMap(eval(f))(p => p) | |
def await[F[_],A](p: P[F,A]): P[F, Step[A, P[F,A]]] = flatMap(available(p)) { step => | |
Chunk.uncons(step.head) match { | |
case None => empty | |
case Some((hd,tl)) => emit { Step(hd, append(emits(tl), step.tail)) } | |
} | |
} | |
def fail[F[_],A](e: Throwable): P[F,A] | |
def onError[F[_],A](p: P[F,A])(handle: Throwable => P[F,A]): P[F,A] | |
def bracket[F[_],R,A](acquire: F[R])(use: R => P[F,A], release: R => F[Unit]): P[F,A] | |
def free[F[_],A](fa: Free[F,P[F,A]]): P[F,A] | |
def eval[F[_],A](fa: F[A]): P[F,A] = free(Free.Eval(fa) flatMap (a => Free.Pure(emit(a)))) | |
def runFold[F[_],A,B](p: P[F,A], z: B)(f: (B,A) => B): Free[F,Either[Throwable,B]] | |
def terminated[F[_],A](p: P[F,A]): P[F,Option[A]] = | |
append(map(p)(Some(_)), emit(None)) | |
def mergeHaltBoth[F[_],A](p: P[F,A], p2: P[F,A])(implicit F: Async[F]): P[F,A] = { | |
type Stepping = F[P[F, Step[Chunk[A], P[F,A]]]] | |
def go(f1: Stepping, f2: Stepping): P[F,A] = { | |
flatMap(eval(F.race(f1,f2))) { | |
case Left(p) => flatMap(p) { p => append( | |
emits(p.head), | |
flatMap(availableAsync(p.tail))(go(_,f2))) | |
} | |
case Right(p2) => flatMap(p2) { p2 => append( | |
emits(p2.head), | |
flatMap(availableAsync(p2.tail))(go(f1,_))) | |
} | |
} | |
} | |
flatMap(availableAsync(p)) { f1 => | |
flatMap(availableAsync(p2)) { f2 => go(f1,f2) }} | |
} | |
} | |
case class NF[F[_],+A](cleanup: Finalizers[F], frame: NF.Frame[F,A]) | |
object NF extends Stream[NF] { | |
sealed trait Frame[F[_],+A] | |
case class Emits[F[_],A](c: Chunk[A]) extends Frame[F,A] | |
case class Cons[F[_],A](hd: Chunk[A], a2: () => NF[F,A]) extends Frame[F,A] | |
case class Fail[F[_],A](err: Throwable) extends Frame[F,A] | |
case class Await[F[_],A](f: Free[F,NF[F,A]]) extends Frame[F,A] | |
case class Acquire[F[_],A](f: Free[F,(F[Unit], NF[F,A])]) extends Frame[F,A] | |
def emits[F[_],A](c: Chunk[A]): NF[F,A] = NF(Finalizers.empty, Emits(c)) | |
def fail[F[_],A](e: Throwable): NF[F,A] = NF(Finalizers.empty, Fail(e)) | |
def free[F[_],A](f: Free[F,NF[F,A]]): NF[F,A] = NF(Finalizers.empty, Await(f)) | |
def append[F[_],A](a1: NF[F,A], a2: => NF[F,A]): NF[F,A] = NF(a1.cleanup, a1.frame match { | |
case Emits(c) => Cons(c, () => a2) | |
case Cons(h,t) => Cons(h, () => append(t(),a2)) | |
case Fail(e) => Fail(e) | |
case Await(f) => Await(f map (a1 => append(a1, a2))) | |
case Acquire(r) => Acquire(r map { case (release, a1) => (release, append(a1,a2)) }) | |
}) | |
def flatMap[F[_],A,B](a: NF[F,A])(f: A => NF[F,B]): NF[F,B] = { | |
def go(a: NF[F,A])(f: A => NF[F,B]): NF[F,B] = a.frame match { | |
case Fail(e) => fail(e) | |
case Emits(c) => Chunk.uncons(c) match { | |
case None => emits(Chunk.empty) | |
case Some((hd,tl)) => append(f(hd), go(emits(tl))(f)) | |
} | |
case Cons(h, t) => append(go(emits(h))(f), go(t())(f)) | |
case Await(g) => NF(a.cleanup, Await(g map (a => go(a)(f)))) | |
case Acquire(r) => NF(a.cleanup, Acquire(r map { case (release, a) => (release, go(a)(f)) })) | |
} | |
scope(a.cleanup) { go(a)(f) } | |
} | |
def scope[F[_],A](finalizers: Finalizers[F])(a: NF[F,A]): NF[F,A] = | |
if (finalizers.isEmpty) a | |
else NF(a.cleanup append finalizers, a.frame match { | |
case Cons(h, t) => Cons(h, () => scope(finalizers)(t())) | |
case Await(g) => Await(g map (scope(finalizers))) | |
case Acquire(g) => Acquire(g map { case (release, a) => (release, scope(finalizers)(a)) }) | |
case _ => a.frame | |
}) | |
def bracket[F[_],R,A](acquire: F[R])(use: R => NF[F,A], release: R => F[Unit]): NF[F,A] = | |
NF(Finalizers.empty, | |
Acquire(Free.Eval(acquire) map { r => | |
val cleanup = release(r) | |
// notice we don't run the `release` effect here, that is handled | |
// automatically by `runFold`, which detects when finalizers pass | |
// out of scope | |
(cleanup, scope(Finalizers.single(cleanup))(use(r))) | |
}) | |
) | |
def onError[F[_],A](a: NF[F,A])(handle: Throwable => NF[F,A]): NF[F,A] = a.frame match { | |
// keeps `a.cleanup` in scope for duration of handler | |
case Fail(e) => scope(a.cleanup)(handle(e)) | |
case Emits(_) => a | |
case Await(g) => NF(a.cleanup, Await(g map (a => onError(a)(handle)))) | |
case Acquire(g) => NF(a.cleanup, Acquire(g map { case (f,a) => (f, onError(a)(handle)) })) | |
case Cons(h, t) => NF(a.cleanup, Cons(h, () => onError(t())(handle))) | |
} | |
def drain[F[_],A,B](a: NF[F,A]): NF[F,B] = NF(a.cleanup, a.frame match { | |
case Fail(e) => Fail(e) | |
case Emits(_) => Emits(Chunk.empty) | |
case Await(g) => Await(g map drain) | |
case Acquire(g) => Acquire(g map { case (f,a) => (f,drain(a)) }) | |
case Cons(_, t) => Cons(Chunk.empty, () => drain(t())) | |
}) | |
def mask[F[_],A](a: NF[F,A]): NF[F,A] = | |
onError(a)(e => emits(Chunk.empty)) | |
def available[F[_],A](a: NF[F,A]): NF[F, Step[Chunk[A], NF[F,A]]] = NF(a.cleanup, a.frame match { | |
case Fail(e) => Fail(e) | |
case Emits(c) => Emits(Chunk.singleton(Step(c, emits(Chunk.empty)))) | |
case Await(f) => Await(f map available) | |
case Acquire(f) => Acquire(f map { case (r,a) => (r, available(a)) }) | |
case Cons(h, t) => Emits(Chunk.singleton(Step(h, t()))) | |
}) | |
def availableAsync[F[_],A](a: NF[F,A])(implicit F: Async[F]): NF[F, F[NF[F, Step[Chunk[A], NF[F,A]]]]] = | |
NF(a.cleanup, a.frame match { | |
case Fail(e) => Fail(e) | |
case Emits(c) => Emits(Chunk.singleton(F.pure { emit(Step(c, emits(Chunk.empty))) })) | |
case Cons(h, t) => Emits(Chunk.singleton(F.pure { emit(Step(h, t())) })) | |
case Await(f) => Await { Free.Eval { | |
F.bind(F.pool[NF[F,A]]) { q => | |
F.map(F.putFree(q)(f)) { _ => | |
emit[F,F[NF[F,Step[Chunk[A], NF[F,A]]]]](F.pure(flatMap(eval(F.take(q)))(available))) } | |
} | |
}} | |
case Acquire(r) => Acquire { Free.Eval { | |
F.bind(F.pool[(F[Unit], NF[F,A])]) { q => | |
F.map(F.putFree(q)(r)) { _ => | |
val nf: F[NF[F,A]] = F.map(F.peek(q)) { case (_,nf) => nf } | |
// even though `r` has not completed, we can obtain a stable | |
// reference to what the finalizer will be eventually | |
val f: F[Unit] = F.bind(F.peek(q)) { case (f,_) => f } | |
(f, scope(Finalizers.single(f)) { | |
emit[F,F[NF[F,Step[Chunk[A], NF[F,A]]]]](F.pure(flatMap(eval(nf))(available))) | |
}) | |
} | |
}}} | |
}) | |
def runFold[F[_],A,B](p: NF[F,A], z: B)(f: (B, A) => B): Free[F,Either[Throwable,B]] = { | |
def go[A,B](prev: Finalizers[F], p: NF[F,A], z: B)(f: (B, A) => B): Free[F,Either[Throwable,B]] = | |
// at each step, we run any finalizers that have passed out of scope from previous step | |
// and we also run any leftover finalizers when reaching the end of the stream | |
prev.runDeactivated(p.cleanup) flatMap { _ => | |
p.frame match { | |
// NB: this is guaranteed not to overlap with `runDeactivated`, via definition of | |
// `runDeactivated`, which won't include any finalizers in `p.cleanup` | |
case Fail(e) => p.cleanup.run flatMap { _ => Free.Pure(Left(e)) } | |
case Emits(c) => p.cleanup.run flatMap { _ => Free.Pure(Right(Chunk.foldLeft(c,z)(f))) } | |
case Cons(h, t) => go(p.cleanup, t(), Chunk.foldLeft(h,z)(f))(f) | |
case Await(g) => g flatMap (p2 => go(p.cleanup, p2, z)(f)) | |
case Acquire(g) => g flatMap { case (_, p2) => go(p.cleanup, p2, z)(f) } | |
} | |
} | |
go(Finalizers.empty, p, z)(f) | |
} | |
} | |
import java.util.UUID | |
private[streams] | |
class Finalizers[F[_]](private[Finalizers] val order: Vector[UUID], | |
private[Finalizers] val actions: Map[UUID, F[Unit]]) { | |
def isEmpty = order.isEmpty | |
def append(f2: Finalizers[F]): Finalizers[F] = | |
if (order.isEmpty) f2 | |
else new Finalizers(order ++ f2.order, actions ++ f2.actions) | |
def runDeactivated(f2: Finalizers[F]): Free[F,Unit] = { | |
// anything which has dropped out of scope is considered deactivated | |
val order2 = order.filter(id => !f2.actions.contains(id)) | |
order2.foldRight(Free.Pure(()): Free[F,Unit])((f,acc) => Free.Eval(actions(f)) flatMap { _ => acc }) | |
} | |
def run: Free[F,Unit] = | |
order.foldRight(Free.Pure(()): Free[F,Unit])((f,acc) => Free.Eval(actions(f)) flatMap { _ => acc }) | |
} | |
object Finalizers { | |
def single[F[_]](f: F[Unit]): Finalizers[F] = { | |
val id = UUID.randomUUID | |
new Finalizers(Vector(id), Map(id -> f)) | |
} | |
def empty[F[_]]: Finalizers[F] = new Finalizers(Vector(), Map()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment