Last active
December 19, 2015 18:48
-
-
Save YoEight/6001197 to your computer and use it in GitHub Desktop.
Scalaz-stream Process to Play Enumerator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package deiko | |
import scala.concurrent.Future | |
import scala.concurrent.duration.Duration | |
import scalaz.concurrent.Task | |
import scalaz.stream._ | |
import play.api.libs.iteratee._ | |
import Process._ | |
object Conversion { | |
def sourceToEnumerator[E](proc: Process[Task, E]): Enumerator[E] = new Enumerator[E] { | |
def apply[A](start: Iteratee[E, A]): Future[Iteratee[E, A]] = { | |
def feed(xs: Seq[E], next: Process[Task, E], i: Iteratee[E, A]): Future[Iteratee[E, A]] = { | |
def step(i: Iteratee[E, A]) = | |
i.fold { | |
case Step.Cont(_) => feed(xs.tail, next, i) | |
case _ => kill(next, i) | |
} | |
if (xs.isEmpty) go(next, i) | |
else step(Iteratee.flatten(i.feed(Input.El(xs.head)))) | |
} | |
@annotation.tailrec | |
def kill(proc: Process[Task, E], i: Iteratee[E, A]): Future[Iteratee[E, A]] = proc match { | |
case Halt => Future.successful(i) | |
case Emit(_, next) => kill(next, i) | |
case Await(req, recv, fallback, error) => | |
val next = try recv(req.run) catch { | |
case Process.End => fallback | |
case e: Exception => error match { | |
case Halt => throw e | |
case _ => error ++ Process.wrap(Task.delay(throw e)) | |
} | |
} | |
kill(next, i) | |
} | |
def go(proc: Process[Task, E], i: Iteratee[E, A]): Future[Iteratee[E, A]] = { | |
i.fold { | |
case Step.Cont(k) => proc match { | |
case Emit(values, next) => feed(values, next, i) | |
case Halt => go(proc, k(Input.EOF)) | |
case Await(req, recv, fallback, error) => | |
val next = try recv(req.run) catch { | |
case Process.End => fallback | |
case e: Exception => error match { | |
case Halt => throw e | |
case _ => error ++ Process.wrap(Task.delay(throw e)) | |
} | |
} | |
go(next, i) | |
} | |
case _ => kill(proc.kill, i) | |
} | |
} | |
go(proc, start) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment