Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Created March 1, 2014 22:32
Show Gist options
  • Save pchlupacek/9298516 to your computer and use it in GitHub Desktop.
Save pchlupacek/9298516 to your computer and use it in GitHub Desktop.
Process2 with Append
package scalaz.stream
import scalaz._
import scalaz.concurrent.Task
import scala.annotation.tailrec
/**
* Created by pach on 01/03/14.
*/
sealed trait Process2[+F[_], +O] {
import Process2._
import Util._
/** Ignore all outputs of this `Process`. */
final def drain: Process2[F, Nothing] = this match {
case h@Halt(_) => h
case a@Await(_, _) => a.extend(_.drain)
case Emit(h, t) => suspend { t.drain }
case Append(p, fp2) => ???
}
/** Send the `Kill` signal to the next `Await`, then ignore all outputs. */
final def kill: Process2[F, Nothing] = this.disconnect.drain
/** Causes subsequent await to fail with the `Kill` exception. */
final def disconnect: Process2[F, O] = this match {
case h@Halt(_) => h
case a@Await(_, recv) => ???
case Append(p, fp2) => ???
case Emit(h, t) => suspend { emitAll(h) ++ t.disconnect }
}
/**
* Replace the `Halt` at the end of this `Process` with whatever
* is produced by `f`.
*/
final def onHalt[F2[x] >: F[x], O2 >: O](f: Throwable => Process2[F2, O2]): Process2[F2, O2] =
this match {
case h@Halt(e) => suspend { f(e) }
case _ => Append(this, f)
}
/**
* If this process halts without an error, attaches `p2` as the next step.
*
* Please note that even when p2 is passed lazily,
* it may be evaluated _before_ it is actually needed for performance reasons
* `p2` is lazy here due to stack-safety.
*
*/
final def append[F2[x] >: F[x], O2 >: O](p2: => Process2[F2, O2]): Process2[F2, O2] = {
debug(">>>>" + this.toString + " --- " + p2)
onHalt {
case End | Kill => p2
case rsn => fail(rsn)
}
}
final def ++[F2[x] >: F[x], O2 >: O](p2: => Process2[F2, O2]): Process2[F2, O2] = append(p2)
final def onComplete[F2[x] >: F[x], O2 >: O](p2: => Process2[F2, O2]): Process2[F2, O2] =
onHalt {
case (End | Kill) => p2
case e => p2.onHalt {
case (End | Kill) => fail(e)
case e2 => fail(Process.CausedBy(e2, e))
}
}
final def onFailure[F2[x] >: F[x], O2 >: O](p2: => Process2[F2, O2]): Process2[F2, O2] =
onHalt {
case e@(End | Kill) => fail(e)
case e => p2.onHalt {
case (End | Kill) => fail(e)
case e2 => fail(Process.CausedBy(e2, e))
}
}
final def flatMap[F2[x] >: F[x], O2](f: O => Process2[F2, O2]): Process2[F2, O2] = {
@tailrec
def go(cur: Process2[F, O], stack: Seq[Throwable => Process2[F, O]]): Process2[F2, O2] = {
debug(s"FM cur: $cur stack: ${stack.size}")
cur match {
case h@Halt(_) if stack.isEmpty => h
case Halt(rsn) => go(stack.head(rsn), stack.tail)
case a@Await(_, _) => a.extend(_.flatMap(f))
case Append(Halt(rsn), n) => n(rsn) flatMap f
case Append(p, n) => go(p, n +: stack)
case Emit(Seq(), t) => t flatMap f
case Emit(Seq(o), t) => f(o) append t.asInstanceOf[Process2[F2, O2]]
case Emit(os, t) => os.tail.foldLeft(f(os.head) append t.asInstanceOf[Process2[F2, O2]])(_ append f(_))
}
}
go(this, Vector())
}
final def map[O2](f: O => O2): Process2[F, O2] =
flatMap { o => emit(f(o)) }
final def pipe[O2](p2: Process1[O, O2]): Process2[F, O2] = {
import scalaz.stream.Process.{Await1, Emit => EmitP, Halt => HaltP}
p2 match {
case HaltP(e) => this.kill onComplete fail(e)
case EmitP(h, t) => emitAll(h) ++ this.pipe(t)
case Await1(recv, fb, c) => this match {
case Halt(End) => halt.pipe(fb.disconnect)
case Halt(e) => fail(e).pipe(c)
case Emit(h, t) => suspend {
if (h.nonEmpty) (emitAll(h.tail) ++ t) pipe (recv(h.head))
else t.pipe(p2)
}
case a@Await(_, _) => a.extend(_.pipe(p2))
case Append(p, n) => ???
}
}
}
final def tee[F2[x] >: F[x], O2, O3](p2: Process2[F2, O2])(t: Tee[O, O2, O3]): Process2[F2, O3] = {
import scalaz.stream.Process.{Emit => EmitP, Halt => HaltP}
import scalaz.stream.tee.{AwaitL, AwaitR}
t match {
case HaltP(e) => this.kill onComplete p2.kill onComplete fail(e)
case EmitP(h, tl) => emitAll(h) ++ this.tee(p2)(tl)
case AwaitL(recv, fb, c) => this match {
case Halt(End) => halt.tee(p2)(fb.disconnect)
case Halt(e) => fail(e).tee(p2)(c)
case Emit(h, tl) => suspend {
if (h.nonEmpty) (emitAll(h.tail) ++ tl).tee(p2)(recv(h.head))
else tl.tee(p2)(t)
}
case a@Await(_, _) => a.extend(_.tee(p2)(t))
case Append(p, n) => ???
}
case AwaitR(recv, fb, c) => p2 match {
case Halt(End) => this.tee(halt)(fb)
case Halt(e) => this.tee(fail(e))(c)
// casts required since Scala seems to discard type of `p2` when
// pattern matching on it - assigns `F2` and `O2` to `Any` in patterns
case e@Emit(_, _) => e.asInstanceOf[Emit[F2, O2]].extend {
(h: Seq[O2], tl: Process2[F2, O2]) =>
if (h.nonEmpty) this.tee(emitAll(h.tail) ++ tl)(recv(h.head))
else this.tee(tl)(t)
}
case a@Await(_, _) => a.asInstanceOf[Await[F2, Any, O2]].extend(
p2 => this.tee(p2)(t))
case Append(p, n) => ???
}
}
}
final def runFoldMap[F2[x] >: F[x], B](f: O => B)(implicit F: Monad[F2], C: Catchable[F2], B: Monoid[B]): F2[B] = {
debug("####" * 100)
def go(cur: Process2[F2, O], acc: B, stack: Vector[Throwable => Process[F2, O]]): F2[B] = {
debug(s" cur: $cur, acc: $acc, stack: ${stack.size }")
cur match {
case Emit(h, t) =>
go(t.asInstanceOf[Process2[F2, O]], h.asInstanceOf[Seq[O]].foldLeft(acc)((x, y) => B.append(x, f(y))), stack)
case Halt(e) if (stack.isEmpty) =>
e match {
case (End | Kill) => F.point(acc)
case _ => C.fail(e)
}
case Halt(e) => go(stack.head(e).asInstanceOf[Process2[F2, O]], acc, stack.tail)
case Append(Halt(rsn), n) => go(n(rsn).asInstanceOf[Process2[F2, O]], acc, stack)
case Append(p, n) => go(p.asInstanceOf[Process2[F2, O]], acc, n.asInstanceOf[Throwable => Process[F2, O]] +: stack)
case Await(req, rcv) =>
F.bind(C.attempt(req.asInstanceOf[F2[AnyRef]])) {
e =>
rcv(e).attemptRun match {
case -\/(e) => go(Halt(e), acc, stack)
case \/-(p) => go(p.asInstanceOf[Process2[F2, O]], acc, stack)
}
}
}
}
go(this, B.zero, Vector())
}
/**
* Collect the outputs of this `Process[F,O]`, given a `Monad[F]` in
* which we can catch exceptions. This function is not tail recursive and
* relies on the `Monad[F]` to ensure stack safety.
*/
final def runLog[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[IndexedSeq[O2]] = {
runFoldMap[F2, IndexedSeq[O2]](IndexedSeq(_))(
F, C,
// workaround for performance bug in Vector ++
Monoid.instance[IndexedSeq[O2]](
(a, b) => b.foldLeft(a)(_ :+ _),
IndexedSeq.empty)
)
}
}
object Process2 {
// We are just using `Task` for its exception-catching and trampolining,
// just defining local alias to avoid confusion
type Trampoline[+A] = Task[A]
val Trampoline = Task
/**
* Represents state of process, that was terminated
* If the e eq [[scalaz.stream.Process.End]] that indicates that process terminated due to the fact that elements
* of the process were consumed (normal termination)
* @param e
*/
case class Halt(e: Throwable) extends Process2[Nothing, Nothing]
/**
* Represents state of process, where process awaits deferred computation of `A`.
* After receiving the `A` recv function is applied to create next state of process.
* recv function potentially contains as well any cleanup code, that may need to be run.
*/
case class Await[+F[_], A, +O](
req: F[A]
, rcv: (Throwable \/ A) => Trampoline[Process2[F, O]]
) extends Process2[F, O] {
/**
* Helper to modify the result of `rcv` parameter of await stack-safely on trampoline.
*/
def extend[F2[x] >: F[x], O2](f: Process2[F, O] => Process2[F2, O2]): Process2[F2, O2] =
Await[F2, A, O2](req, e => Trampoline.suspend(rcv(e)).map(f))
}
/**
* Represents state of process, where there are possibly elements ready to be _emitted_ in `head`
* and next state of process in tail. Note the tail is *strict* so to have lazy code attached to this
* state you must wrap that in `Suspend` constructor
*/
case class Emit[F[_], O](
head: Seq[O]
, tail: Process2[F, O]
) extends Process2[F, O] {
/**
* Helper to modify the tail of Emit. Note that the function is applied lazily in Suspend constructor
*/
def extend[F2[x] >: F[x], O2](f: (Seq[O], Process2[F, O]) => Process2[F2, O2]):
Process2[F2, O2] =
suspend { f(head, tail) }
}
// /**
// * Represents state of the process where evaluation of the next state is suspended,
// * that means it is evaluated when needed. This is particularly usefull with Emit.
// * @param get
// * @tparam F
// * @tparam O
// */
// case class Suspend[F[_], O](get: Trampoline[Process2[F, O]]) extends Process2[F, O]
case class Append[F[_], O](p1: Process2[F, O], next: Throwable => Process2[F, O]) extends Process2[F, O]
/**
* Wraps the evaluation of supplied next process state in Suspend. Note that means
* that the process evaluation will happen when the driver will need that process state, not before
*
* Note, that the `p` is reevaluated always when the driver need that next process state.
* If this is not desired, any ou would like the evaluation happen only once, please use `lazily`
* @return
*/
def suspend[F[_], O](p: => Process2[F, O]): Process2[F, O] =
Append(halt, {
case End | Kill => p
case rsn => fail(rsn)
})
/**
* Similar to `suspend`, except it caches the evaluation of `p` for subsequent computations
* @return
*/
def lazily[F[_], O](p: => Process2[F, O]): Process2[F, O] = {
lazy val pe = p
Append(halt, {
case End | Kill => pe
case rsn => fail(rsn)
})
}
/**
* Helper to wrap evaluation of `p` that may cause side-effects by throwing exception.
*/
private[stream] def Try[F[_], A](p: => Process2[F, A]): Process2[F, A] =
try p
catch {case e: Throwable => Halt(e) }
val halt = Halt(End)
def fail(err: Throwable): Process2[Nothing, Nothing] =
Halt(err)
def fail(msg: String): Process2[Nothing, Nothing] =
Halt(new Exception(msg))
def emit[O](o: O): Process2[Nothing, O] =
Emit[Nothing, O](Vector(o), halt)
def emitAll[O](s: Seq[O]): Process2[Nothing, O] =
Emit[Nothing, O](s, halt)
def emitSeq[F[_], O](
head: Seq[O],
tail: Process2[F, O] = halt): Process2[F, O] =
Emit[F, O](head, tail)
def await[F[_], A, O](req: F[A])(rcv: Throwable \/ A => Trampoline[Process2[F, O]]): Process2[F, O] =
Await[F, A, O](req, rcv)
def eval[F[_], O](req: F[O]): Process2[F, O] =
Await[F, O, O](req, _.fold(
e => Trampoline.now(fail(e)),
a => Trampoline.now(emit(a))
))
/**
* Special exception indicating normal termination due to
* input ('upstream') termination. An `Await` may respond to an `End`
* by switching to reads from a secondary source.
*/
case object End extends Exception {
override def fillInStackTrace = this
}
/**
* Special exception indicating downstream termination.
* An `Await` should respond to a `Kill` by performing
* necessary cleanup actions, then halting.
*/
case object Kill extends Exception {
override def fillInStackTrace = this
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// SYNTAX
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment