Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Created February 22, 2015 05:29
Show Gist options
  • Save pchlupacek/880a01e170afe897422d to your computer and use it in GitHub Desktop.
Save pchlupacek/880a01e170afe897422d to your computer and use it in GitHub Desktop.
/**
* Asynchronous execution of this Process. Note that this method is not resource safe unless
* callback is called with _left_ side completed. In that case it is guaranteed that all cleanups
* has been successfully completed.
* User of this method is responsible for any cleanup actions to be performed by running the
* next Process obtained on right side of callback.
*
* This method returns a function, that when applied, causes the running computation to be interrupted.
* That is useful of process contains any asynchronous code, that may be left with incomplete callbacks.
* If the evaluation of the process is interrupted, then the interruption is only active if the callback
* was not completed before, otherwise interruption is no-op.
*
* There is chance, that cleanup code of intermediate `Await` will get called twice on interrupt, but
* always at least once. The second cleanup invocation in that case may run on different thread, asynchronously.
*
*
* @param cb result of the asynchronous evaluation of the process.
* Callback on right side will never be called if evaluation of the process resulted in `empty`
* @param S Strategy to use when evaluating the process. Note that `Strategy.Sequential` will cause SOE.
* @return Function to interrupt the evaluation
*/
protected[stream] final def runAsync(
cb: Cause \/ (Seq[O], Cont[Task,O]) => Unit
, tag:String="*"
)(implicit S: Strategy): (EarlyCause) => Unit = {
val idx = counter.incrementAndGet()
sealed trait M
case class AsyncStep(next:Process[Task,O]) extends M
case class AwaitDone(result: Throwable \/ Any, rcv:(EarlyCause \/ Any) => Trampoline[Process[Task, O]], next:Cont[Task,O]) extends M
case class CleanupDone(rsn:Cause) extends M
case object Interrupt extends M
// forward referenced actor
var a: Actor[M] = null
// if set, the async computation termiated.
var terminated: Option[Cause] = None
val emptyCleanup = () => halt
@volatile var cleanup: () => Process[Task,O] = emptyCleanup
// indicates state of cleanup in progress
// on right side it indicates if any cleanup is running (true)
// on left side indicates cleanup to be run after current cleanup terminates.
var cleanupState: Process[Task,O] \/ Boolean = right(false)
def stepAsync(p:Process[Task,O]) : Unit = {
p.step match {
case Halt(cause) =>
terminated = Some(cause)
println(("RSE ", tag, idx, "step-Halt", cause, terminated, cleanup))
S(cb(left(cause)))
case Step(Emit(os), next) if os.isEmpty =>
cleanup = () => next.continue.kill
println(("RSG ", tag, idx, "step-go", terminated, cleanup))
a ! AsyncStep(next.continue)
case Step(Emit(os), next) =>
cleanup = emptyCleanup
terminated = Some(End)
println(("RSE ", tag, idx, "step-Emit", os, terminated, cleanup))
S(cb(right(os -> next)))
case Step(Await(req,rcv), next) =>
req.get.step
cleanup = () => Try(rcv(left(Kill)).run) +: next
println(("RSW ", tag, idx, "step-Await", next,terminated, cleanup))
S(req.runAsync { r => a ! AwaitDone(r,rcv,next) })
}
}
a = Actor[M] {
case AsyncStep(next) =>
println(("RSS ", tag, idx, "AsyncStep", next, terminated, cleanup))
cleanup = emptyCleanup
terminated match {
case None => stepAsync(next)
case Some(cause) =>
// hence the interrupt was run and we didn't had any task to be evaluated
// full cleanup was run during interrupt and we are free to just be no-op
()
}
case AwaitDone(r,rcv,next) =>
println(("RSA ",tag, idx, "AWT DONE", r, rcv, next, cleanupState,terminated, cleanup))
terminated match {
case None =>
val rcvNext = Try(rcv(EarlyCause.fromTaskResult(r)).run)
stepAsync(rcvNext +: next)
case Some(cause) if cleanupState == right(true) =>
// leave cleanup to be run once the cleanup in progress will finish
// this prevents both cleanups (from dangling task and interrupt)
// to be run simultaneously that can prevent deadlock
val rcvNext = Try(rcv(left(Kill)).run)
cleanupState = left(rcvNext)
case Some(cause) =>
// this is remains from the dangling Task
// we need to run this asynchronously to eventually cleanup any resources opened
// there is a chance that this allocated any resources and we must release them here
val rcvNext = Try(rcv(left(Kill)).run)
S(rcvNext.run.runAsync { _ => () })
}
case Interrupt =>
// on interrupt we try to run any known cleanup
// if process already terminated, this is just no-op
println(("RSI ",tag, idx, "Interrupt", terminated, cleanup))
if (terminated.isEmpty) {
terminated = Some(Kill)
// not yet terminated, we have to run any cleanups here
val next = cleanup()
cleanup = emptyCleanup
//set cleanup is in progress
cleanupState = right(true)
S(next.run.runAsync { r => a ! CleanupDone(r.fold(Error(_), _=> Kill)) })
}
case CleanupDone(cause) =>
// indicates cleanup from interrupt was finished.
// this just shall check if in meantime there was any dangling task cleanup registered,
// and if yes this shall run it.
println(("RSC ",tag, idx, "Clean", cause, cleanupState, terminated, cleanup))
cleanupState match {
case \/-(_) => S(cb(left(cause)))
//cleanup from dangling await registered, lets run it
case -\/(nc) =>
S(nc.run.runAsync { _ => ()})
S(cb(left(cause)))
}
}
println(("RAS ",tag, idx, "Start", self, terminated, cleanup))
stepAsync(self)
{_ => S( a ! Interrupt )}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment