Created
July 31, 2015 19:09
-
-
Save pchiusano/f6b554f5e34f9af1fd7a to your computer and use it in GitHub Desktop.
Scala Streams API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Stream[S[+_[_],+_]] { self => | |
// list-like operations | |
def empty[A]: S[Nothing,A] = emits[A](Chunk.empty) | |
def emits[A](as: Chunk[A]): S[Nothing,A] | |
def append[F[_],A](a: S[F,A], b: => S[F,A]): S[F,A] | |
def flatMap[F[_],A,B](a: S[F,A])(f: A => S[F,B]): S[F,B] | |
// evaluating effects | |
def eval[F[_],A](fa: F[A]): S[F,A] | |
// failure and error recovery | |
def fail(e: Throwable): S[Nothing,Nothing] | |
def onError[F[_],A](p: S[F,A])(handle: Throwable => S[F,A]): S[F,A] | |
def onComplete[F[_],A](p: S[F,A], regardless: => S[F,A]): S[F,A] = | |
onError(append(p, mask(regardless))) { err => append(mask(regardless), fail(err)) } | |
def mask[F[_],A](a: S[F,A]): S[F,A] = | |
onError(a)(_ => empty[A]) | |
// resource acquisition | |
def bracket[F[_],R,A](acquire: F[R])(use: R => S[F,A], release: R => F[Unit]): S[F,A] | |
// stepping a stream | |
type Handle[+F[_],+_] | |
type Pull[+F[_],+R,+O] | |
def pullMonad[F[_],O]: Monad[({ type f[x] = Pull[F,x,O]})#f] | |
def emits[F[_],O](p: S[F,O]): Pull[F,Unit,O] | |
def runPull[F[_],R,O](p: Pull[F,R,O]): S[F,O] | |
type AsyncStep[F[_],A] = F[Pull[F, Step[Chunk[A], S[F,A]], Nothing]] | |
type AsyncStep1[F[_],A] = F[Pull[F, Step[A, S[F,A]], Nothing]] | |
def await[F[_],A](h: Handle[F,A]): Pull[F, Step[Chunk[A], Handle[F,A]], Nothing] | |
def await1[F[_],A](h: Handle[F,A]): Pull[F, Step[A, Handle[F,A]], Nothing] | |
def awaitAsync[F[_],A](h: Handle[F,A])(implicit F: Async[F]): Pull[F, AsyncStep[F,A], Nothing] | |
def await1Async[F[_],A](h: Handle[F,A])(implicit F: Async[F]): Pull[F, AsyncStep1[F,A], Nothing] | |
def open[F[_],A](s: S[F,A]): Pull[F,Handle[F,A],Nothing] | |
// evaluation | |
def runFold[F[_],A,B](p: S[F,A], z: B)(f: (B,A) => B): Free[F,Either[Throwable,B]] | |
// derived operations | |
def map[F[_],A,B](a: S[F,A])(f: A => B): S[F,B] = | |
flatMap(a)(f andThen (emit)) | |
def emit[F[_],A](a: A): S[F,A] = emits(Chunk.singleton(a)) | |
def suspend[F[_],A](s: => S[F,A]): S[F,A] = | |
flatMap(emit(())) { _ => s } | |
def force[F[_],A](f: F[S[F, A]]): S[F,A] = | |
flatMap(eval(f))(p => p) | |
def eval_[F[_],A](fa: F[A]): S[F,Nothing] = | |
flatMap(eval(fa)) { _ => empty } | |
def terminated[F[_],A](p: S[F,A]): S[F,Option[A]] = | |
p.map(Some(_)) ++ emit(None) | |
def drain[F[_],A](p: S[F,A]): S[F,Nothing] = | |
p flatMap { _ => empty } | |
implicit class StreamSyntax[+F[_],+A](p1: S[F,A]) { | |
def map[B](f: A => B): S[F,B] = | |
self.map(p1)(f) | |
def flatMap[F2[x]>:F[x],B](f: A => S[F2,B]): S[F2,B] = | |
self.flatMap(p1: S[F2,A])(f) | |
def ++[F2[x]>:F[x],B>:A](p2: S[F2,B])(implicit R: RealSupertype[A,B]): S[F2,B] = | |
self.append(p1: S[F2,B], p2) | |
def append[F2[x]>:F[x],B>:A](p2: S[F2,B])(implicit R: RealSupertype[A,B]): S[F2,B] = | |
self.append(p1: S[F2,B], p2) | |
def onError[F2[x]>:F[x],B>:A](f: Throwable => S[F2,B])(implicit R: RealSupertype[A,B]): S[F2,B] = | |
self.onError(p1: S[F2,B])(f) | |
def runFold[B](z: B)(f: (B,A) => B): Free[F,Either[Throwable,B]] = | |
self.runFold(p1, z)(f) | |
def runLog: Free[F,Either[Throwable,Vector[A]]] = | |
self.runFold(p1, Vector.empty[A])(_ :+ _) | |
} | |
implicit class HandleSyntax[+F[_],+A](h: Handle[F,A]) { | |
def await: Pull[F, Step[Chunk[A], Handle[F,A]], Nothing] = self.await(h) | |
def await1: Pull[F, Step[A, Handle[F,A]], Nothing] = self.await1(h) | |
def awaitAsync[F2[x]>:F[x],A2>:A](implicit F2: Async[F2], A2: RealSupertype[A,A2]): | |
Pull[F2, AsyncStep[F2,A2], Nothing] = self.awaitAsync(h) | |
def await1Async[F2[x]>:F[x],A2>:A](implicit F2: Async[F2], A2: RealSupertype[A,A2]): | |
Pull[F2, AsyncStep1[F2,A2], Nothing] = self.await1Async(h) | |
} | |
implicit class PullSyntax[+F[_],+R,+O](p: Pull[F,R,O]) { | |
def map[R2](f: R => R2): Pull[F,R2,O] = | |
self.pullMonad.map(p)(f) | |
def flatMap[F2[x]>:F[x],O2>:O,R2](f: R => Pull[F2,R2,O2]): Pull[F2,R2,O2] = | |
self.pullMonad.bind(p: Pull[F2,R,O2])(f) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment