Last active
December 16, 2015 01:09
-
-
Save sadache/5352651 to your computer and use it in GitHub Desktop.
Fixes several leaking Enumerators and features buffer Enumeratee which buffers when socket is not ready for writing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ackage controllers | |
import play.api._ | |
import play.api.mvc._ | |
import play.api.libs.iteratee._ | |
import play.api.libs.EventSource | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.{Future, Promise} | |
// Solves memory leaks | |
object Solutions { | |
trait TreatCont0[E]{ | |
def apply[A](loop: Iteratee[E,A] => Unit, k: Input[E] => Iteratee[E,A]) | |
} | |
//An abstract Enumerator constructor that doesn't leak ( doesn't use flatMap ), mind that it is more imperative (using Unit) | |
def checkContinue0[E](inner:TreatCont0[E]) = new Enumerator[E] { | |
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = { | |
val p = Promise[Iteratee[E, A]]() | |
def step(it: Iteratee[E, A]) { it.pureFold { | |
case Step.Done(a, e) => | |
p.success(Done(a,e)) | |
case Step.Cont(k) => | |
inner[A](step,k) | |
case Step.Error(msg, e) => | |
p.success(Error(msg,e)) | |
} | |
} | |
step(it) | |
p.future | |
} | |
} | |
def generateM[E](e: => Future[Option[E]]): Enumerator[E] = checkContinue0( new TreatCont0[E] { | |
def apply[A](loop: Iteratee[E,A] => Unit, k: Input[E] => Iteratee[E,A]) = e.foreach { | |
case Some(e) => loop(k(Input.El(e))) | |
case None => loop(Cont(k)) | |
} | |
}) | |
def repeat[E](e: => E): Enumerator[E] = checkContinue0( new TreatCont0[E]{ | |
def apply[A](loop: Iteratee[E,A] => Unit, k: Input[E] => Iteratee[E,A]) = loop(k(Input.El(e))) | |
}) | |
def grouped[From](i:Int): Enumeratee[From, List[From]] = new Enumeratee.CheckDone[From, List[From]] { | |
def step[A](counter:Int, cs:List[From])(k: K[List[From], A]): K[From, Iteratee[List[From], A]] = { | |
case in @ Input.El(e) => | |
if(counter+1 == i) | |
new Enumeratee.CheckDone[From, List[From]] { | |
def continue[A](k: K[List[From], A]) = | |
Cont(step(0,List())(k)) | |
} &> k(Input.El(e::cs)) | |
else Cont(step(counter+1, e::cs)(k)) | |
case Input.Empty => Cont(step(counter,cs)(k)) | |
case Input.EOF => Done(Cont(k), Input.EOF) | |
} | |
def continue[A](k: K[List[From], A]) = Cont(step(0,List())(k)) | |
} | |
} | |
object Application extends Controller { | |
// Buffer if socket not ready | |
def buffer[E](maxBuffer: Int): Enumeratee[E, List[E]] = new Enumeratee[E, List[E]] { | |
import scala.util.{Try, Failure, Success} | |
import scala.collection.immutable.Queue | |
import scala.concurrent.stm._ | |
import play.api.libs.iteratee.Enumeratee.CheckDone | |
def applyOn[A](it: Iteratee[List[E], A]): Iteratee[E, Iteratee[List[E], A]] = { | |
val last = Promise[Iteratee[E, Iteratee[List[E], A]]]() | |
sealed trait State | |
case class Queueing(q: Queue[Input[E]], length: Long) extends State | |
case class Waiting(p: scala.concurrent.Promise[Input[List[E]]]) extends State | |
case class DoneIt(s: Iteratee[E, Iteratee[List[E], A]]) extends State | |
case class InPause(q: Queue[Input[E]],p:Promise[Unit]) extends State | |
trait ProducerAction | |
case object Continue extends ProducerAction | |
case class PauseProducer(p:Future[Unit]) extends ProducerAction | |
case class ResumeConsumer(p:Promise[Input[List[E]]]) extends ProducerAction | |
case class Finish(it: Iteratee[E,Iteratee[List[E], A]]) extends ProducerAction | |
val state: Ref[State] = Ref(Queueing(Queue[Input[E]](), 0)) | |
def step(ee: Input[E]):Iteratee[E,Iteratee[List[E], A]] = ee match { | |
case in @ Input.EOF => | |
state.single.getAndTransform { | |
case Queueing(q, l) => Queueing(q.enqueue(in), l) | |
case Waiting(p) => Queueing(Queue(), 0) | |
case d @ DoneIt(it) => d | |
case _ => throw new Exception("illegal state!") | |
} match { | |
case Waiting(p) => | |
p.success(in) | |
case _ => | |
} | |
Iteratee.flatten(last.future) | |
case other => | |
val s = atomic { implicit txn => | |
state() match { | |
case Queueing(q, l) if maxBuffer > 0 && l > maxBuffer => | |
val p = Promise[Unit]() | |
state() = InPause(q.enqueue(other), p) | |
PauseProducer(p.future) | |
case Queueing(q, l) => | |
state() = Queueing(q.enqueue(other), l + 1) | |
Continue | |
case Waiting(p) => | |
state() = Queueing(Queue(), 0) | |
ResumeConsumer(p) | |
case d @ DoneIt(it) => | |
state() = d | |
Finish(it) | |
case _ => throw new Exception("illegal state") | |
} | |
} | |
s match { | |
case ResumeConsumer(p) => | |
p.success(other.map(List(_))) | |
Cont(step) | |
case Finish(it) => it | |
case Continue => Cont(step) | |
case PauseProducer(f) => Iteratee.flatten(f.map(_ => Cont(step)) ) | |
} | |
} | |
def getInputAndRest(q:Queue[Input[E]]):(Input[List[E]],Queue[Input[E]]) = { | |
val (els,after) = q.toList.span(_ != Input.EOF) | |
if(! els.isEmpty){ | |
val e = Input.El({ val l = els.collect{ case Input.El(e) => e}; l}) | |
(e,Queue(after:_*)) | |
} | |
else { ((Input.EOF),Queue()) } | |
} | |
def moreInput: Future[Input[List[E]]]= { | |
val in = atomic { implicit txn => | |
val current = state() | |
state() match { | |
case Queueing(q, l) => | |
if (!q.isEmpty) { | |
val (e,q1) = getInputAndRest(q) | |
state() = Queueing(q1, 1) | |
Future.successful(e) | |
} else { | |
val p = Promise[Input[List[E]]]() | |
state() = Waiting(p) | |
p.future | |
} | |
case InPause(q,p) => | |
p.trySuccess(()) | |
state() = Queueing(Queue(), 0) | |
val (els,after) = q.toList.span(_ != Input.EOF) | |
val (e,q1) = getInputAndRest(q) | |
state() = Queueing(q1, 1) | |
Future.successful(e) | |
case _ => throw new Exception("can't get here") | |
} | |
} | |
in | |
} | |
(Solutions.checkContinue0( new Solutions.TreatCont0[List[E]] { | |
def apply[A](loop: Iteratee[List[E],A] => Unit, k: Input[List[E]] => Iteratee[List[E],A]) = moreInput.foreach { in => | |
loop(k(in)) | |
} | |
}) |>> it).flatMap(_.unflatten).onComplete { | |
case Success(it) => | |
state.single() = DoneIt(Done(it.it,Input.Empty)) | |
last.success(Done(it.it,Input.Empty)) | |
case Failure(e) => | |
state.single() = DoneIt(Iteratee.flatten(Future.failed[Iteratee[E, Iteratee[List[E], A]]](e))) | |
last.failure(e) | |
} | |
Cont(step) | |
} | |
} | |
def index = Action { | |
val chunks = Solutions.repeat("0") &> Enumeratee.take(10000000) &> buffer(10000) &> Enumeratee.map(_.mkString) | |
val grouped = Solutions.repeat("0") &> Enumeratee.take(10000000) &> Solutions.grouped(5000) &> Enumeratee.map(_.mkString) &> buffer(10000) &> Enumeratee.map(_.mkString) | |
Ok.stream( chunks >>> Enumerator.eof ) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment