Created
February 22, 2015 05:29
-
-
Save pchlupacek/880a01e170afe897422d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Asynchronous execution of this Process. Note that this method is not resource safe unless | |
* callback is called with _left_ side completed. In that case it is guaranteed that all cleanups | |
* has been successfully completed. | |
* User of this method is responsible for any cleanup actions to be performed by running the | |
* next Process obtained on right side of callback. | |
* | |
* This method returns a function, that when applied, causes the running computation to be interrupted. | |
* That is useful of process contains any asynchronous code, that may be left with incomplete callbacks. | |
* If the evaluation of the process is interrupted, then the interruption is only active if the callback | |
* was not completed before, otherwise interruption is no-op. | |
* | |
* There is chance, that cleanup code of intermediate `Await` will get called twice on interrupt, but | |
* always at least once. The second cleanup invocation in that case may run on different thread, asynchronously. | |
* | |
* | |
* @param cb result of the asynchronous evaluation of the process. | |
* Callback on right side will never be called if evaluation of the process resulted in `empty` | |
* @param S Strategy to use when evaluating the process. Note that `Strategy.Sequential` will cause SOE. | |
* @return Function to interrupt the evaluation | |
*/ | |
protected[stream] final def runAsync( | |
cb: Cause \/ (Seq[O], Cont[Task,O]) => Unit | |
, tag:String="*" | |
)(implicit S: Strategy): (EarlyCause) => Unit = { | |
val idx = counter.incrementAndGet() | |
sealed trait M | |
case class AsyncStep(next:Process[Task,O]) extends M | |
case class AwaitDone(result: Throwable \/ Any, rcv:(EarlyCause \/ Any) => Trampoline[Process[Task, O]], next:Cont[Task,O]) extends M | |
case class CleanupDone(rsn:Cause) extends M | |
case object Interrupt extends M | |
// forward referenced actor | |
var a: Actor[M] = null | |
// if set, the async computation termiated. | |
var terminated: Option[Cause] = None | |
val emptyCleanup = () => halt | |
@volatile var cleanup: () => Process[Task,O] = emptyCleanup | |
// indicates state of cleanup in progress | |
// on right side it indicates if any cleanup is running (true) | |
// on left side indicates cleanup to be run after current cleanup terminates. | |
var cleanupState: Process[Task,O] \/ Boolean = right(false) | |
def stepAsync(p:Process[Task,O]) : Unit = { | |
p.step match { | |
case Halt(cause) => | |
terminated = Some(cause) | |
println(("RSE ", tag, idx, "step-Halt", cause, terminated, cleanup)) | |
S(cb(left(cause))) | |
case Step(Emit(os), next) if os.isEmpty => | |
cleanup = () => next.continue.kill | |
println(("RSG ", tag, idx, "step-go", terminated, cleanup)) | |
a ! AsyncStep(next.continue) | |
case Step(Emit(os), next) => | |
cleanup = emptyCleanup | |
terminated = Some(End) | |
println(("RSE ", tag, idx, "step-Emit", os, terminated, cleanup)) | |
S(cb(right(os -> next))) | |
case Step(Await(req,rcv), next) => | |
req.get.step | |
cleanup = () => Try(rcv(left(Kill)).run) +: next | |
println(("RSW ", tag, idx, "step-Await", next,terminated, cleanup)) | |
S(req.runAsync { r => a ! AwaitDone(r,rcv,next) }) | |
} | |
} | |
a = Actor[M] { | |
case AsyncStep(next) => | |
println(("RSS ", tag, idx, "AsyncStep", next, terminated, cleanup)) | |
cleanup = emptyCleanup | |
terminated match { | |
case None => stepAsync(next) | |
case Some(cause) => | |
// hence the interrupt was run and we didn't had any task to be evaluated | |
// full cleanup was run during interrupt and we are free to just be no-op | |
() | |
} | |
case AwaitDone(r,rcv,next) => | |
println(("RSA ",tag, idx, "AWT DONE", r, rcv, next, cleanupState,terminated, cleanup)) | |
terminated match { | |
case None => | |
val rcvNext = Try(rcv(EarlyCause.fromTaskResult(r)).run) | |
stepAsync(rcvNext +: next) | |
case Some(cause) if cleanupState == right(true) => | |
// leave cleanup to be run once the cleanup in progress will finish | |
// this prevents both cleanups (from dangling task and interrupt) | |
// to be run simultaneously that can prevent deadlock | |
val rcvNext = Try(rcv(left(Kill)).run) | |
cleanupState = left(rcvNext) | |
case Some(cause) => | |
// this is remains from the dangling Task | |
// we need to run this asynchronously to eventually cleanup any resources opened | |
// there is a chance that this allocated any resources and we must release them here | |
val rcvNext = Try(rcv(left(Kill)).run) | |
S(rcvNext.run.runAsync { _ => () }) | |
} | |
case Interrupt => | |
// on interrupt we try to run any known cleanup | |
// if process already terminated, this is just no-op | |
println(("RSI ",tag, idx, "Interrupt", terminated, cleanup)) | |
if (terminated.isEmpty) { | |
terminated = Some(Kill) | |
// not yet terminated, we have to run any cleanups here | |
val next = cleanup() | |
cleanup = emptyCleanup | |
//set cleanup is in progress | |
cleanupState = right(true) | |
S(next.run.runAsync { r => a ! CleanupDone(r.fold(Error(_), _=> Kill)) }) | |
} | |
case CleanupDone(cause) => | |
// indicates cleanup from interrupt was finished. | |
// this just shall check if in meantime there was any dangling task cleanup registered, | |
// and if yes this shall run it. | |
println(("RSC ",tag, idx, "Clean", cause, cleanupState, terminated, cleanup)) | |
cleanupState match { | |
case \/-(_) => S(cb(left(cause))) | |
//cleanup from dangling await registered, lets run it | |
case -\/(nc) => | |
S(nc.run.runAsync { _ => ()}) | |
S(cb(left(cause))) | |
} | |
} | |
println(("RAS ",tag, idx, "Start", self, terminated, cleanup)) | |
stepAsync(self) | |
{_ => S( a ! Interrupt )} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment