Created
July 9, 2017 19:56
-
-
Save rogeralsing/81b1ed80631b6426382fb5595c0c0e42 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package proto | |
open class LocalContext : IMessageInvoker, IContext, ISupervisor { | |
companion object { | |
private val logger : ILogger = Log.createLogger<LocalContext>() | |
internal fun defaultReceive (context : IContext) : Task { | |
var c : LocalContext = LocalContextcontext | |
if (c.message is PoisonPill) { | |
c.self.stop() | |
return Proto.actor.Done | |
} | |
return c.actor.receiveAsync(context) | |
} | |
internal fun defaultSender (context : ISenderContext, target : PID, envelope : MessageEnvelope) : Task { | |
target.ref.sendUserMessage(target, envelope) | |
return Task.fromResult(0) | |
} | |
} | |
val EmptyChildren : IReadOnlyCollection<PID> = List<PID>() | |
private val _producer : () -> IActor | |
private val _receiveMiddleware : (IContext) -> Task | |
private val _senderMiddleware : (ISenderContext, PID, MessageEnvelope) -> Task | |
private val _supervisorStrategy : ISupervisorStrategy | |
private var _children : FastSet<PID>? = null | |
private var _message : Any? = null | |
private var _receiveTimeoutTimer : Timer? = null | |
private var _restartStatistics : RestartStatistics? = null | |
private var _stash : Stack<Any>? = null | |
private var _state : ContextState? = null | |
private var _watchers : FastSet<PID>? = null | |
constructor(producer : () -> IActor, supervisorStrategy : ISupervisorStrategy, receiveMiddleware : (IContext) -> Task, senderMiddleware : (ISenderContext, PID, MessageEnvelope) -> Task, parent : PID) { | |
_producer = producer | |
_supervisorStrategy = supervisorStrategy | |
_receiveMiddleware = receiveMiddleware | |
_senderMiddleware = senderMiddleware | |
parent = parent | |
incarnateActor() | |
} | |
override val children : IReadOnlyCollection<PID> | |
get() = _childrentoList() ?? EmptyChildren | |
override var actor : IActor | |
override val parent : PID | |
override var self : PID | |
override val message : Any | |
override val sender : PID | |
get() = _message as MessageEnvelopesender | |
override val headers : MessageHeader | |
override var receiveTimeout : Duration | |
override fun stash () { | |
if (_stash == null) { | |
_stash = Stack<Any>() | |
} | |
_stash.push(message) | |
} | |
override fun respond (message : Any) { | |
sender.tell(message) | |
} | |
override fun spawn (props : Props) : PID { | |
var id : String = ProcessRegistry.instance.nextId() | |
return spawnNamed(props, id) | |
} | |
override fun spawnPrefix (props : Props, prefix : String) : PID { | |
var name : String = prefix + ProcessRegistry.instance.nextId() | |
return spawnNamed(props, name) | |
} | |
override fun spawnNamed (props : Props, name : String) : PID { | |
var pid : PID = props.spawn("${self.id}/${name}", self) | |
if (_children == null) { | |
_children = FastSet<PID>() | |
} | |
_children.add(pid) | |
return pid | |
} | |
override fun watch (pid : PID) { | |
pid.sendSystemMessage(Watch(self)) | |
} | |
override fun unwatch (pid : PID) { | |
pid.sendSystemMessage(Unwatch(self)) | |
} | |
override fun setReceiveTimeout (duration : Duration) { | |
if (duration <= TimeSpan.Zero) { | |
throw ArgumentOutOfRangeException(nameof(duration), duration, "Duration must be greater than zero") | |
} | |
if (duration == receiveTimeout) { | |
return | |
} | |
stopReceiveTimeout() | |
receiveTimeout = duration | |
if (_receiveTimeoutTimer == null) { | |
_receiveTimeoutTimer = Timer(receiveTimeoutCallback, null, receiveTimeout, receiveTimeout) | |
} else { | |
resetReceiveTimeout() | |
} | |
} | |
override fun cancelReceiveTimeout () { | |
if (_receiveTimeoutTimer == null) { | |
return | |
} | |
stopReceiveTimeout() | |
_receiveTimeoutTimer = null | |
receiveTimeout = TimeSpan.Zero | |
} | |
override fun receiveAsync (message : Any) : Task { | |
return processMessageAsync(message) | |
} | |
override fun tell (target : PID, message : Any) { | |
sendUserMessage(target, message) | |
} | |
override fun request (target : PID, message : Any) { | |
var messageEnvelope : MessageEnvelope = MessageEnvelope(message, self, null) | |
sendUserMessage(target, messageEnvelope) | |
} | |
override fun requestAsync (target : PID, message : Any, timeout : Duration) : Task<T> = requestAsync(target, message, FutureProcess<T>(timeout)) | |
override fun requestAsync (target : PID, message : Any, cancellationToken : CancellationToken) : Task<T> = requestAsync(target, message, FutureProcess<T>(cancellationToken)) | |
override fun requestAsync (target : PID, message : Any) : Task<T> = requestAsync(target, message, FutureProcess<T>()) | |
override fun reenterAfter (target : Task<T>, action : (Task<T>) -> Task) { | |
var msg : Any = _message | |
var cont : Continuation = Continuation({ -> action(target)}, msg) | |
target.continueWith{t -> | |
self.sendSystemMessage(cont) | |
} | |
} | |
override fun escalateFailure (reason : Exception, who : PID) { | |
if (_restartStatistics == null) { | |
_restartStatistics = RestartStatistics(0, null) | |
} | |
var failure : Failure = Failure(who, reason, _restartStatistics) | |
if (parent == null) { | |
handleRootFailure(failure) | |
} else { | |
self.sendSystemMessage(SuspendMailbox.Instance) | |
parent.sendSystemMessage(failure) | |
} | |
} | |
override fun restartChildren (reason : Exception, pids : arrayOf<PID>) { | |
for(pid in pids) { | |
pid.sendSystemMessage(Restart(reason)) | |
} | |
} | |
override fun stopChildren (pids : arrayOf<PID>) { | |
for(pid in pids) { | |
pid.sendSystemMessage(Stop.Instance) | |
} | |
} | |
override fun resumeChildren (pids : arrayOf<PID>) { | |
for(pid in pids) { | |
pid.sendSystemMessage(ResumeMailbox.Instance) | |
} | |
} | |
override fun invokeSystemMessageAsync (msg : Any) : Task { | |
try { | |
val tmp = msg | |
when (tmp) { | |
is Started -> { | |
val s = tmp | |
return invokeUserMessageAsync(s) | |
} | |
is Stop -> { | |
return handleStopAsync() | |
} | |
is Terminated -> { | |
val t = tmp | |
return handleTerminatedAsync(t) | |
} | |
is Watch -> { | |
val w = tmp | |
handleWatch(w) | |
return Task.fromResult(0) | |
} | |
is Unwatch -> { | |
val uw = tmp | |
handleUnwatch(uw) | |
return Task.fromResult(0) | |
} | |
is Failure -> { | |
val f = tmp | |
handleFailure(f) | |
return Task.fromResult(0) | |
} | |
is Restart -> { | |
return handleRestartAsync() | |
} | |
is SuspendMailbox -> { | |
return Task.fromResult(0) | |
} | |
is ResumeMailbox -> { | |
return Task.fromResult(0) | |
} | |
is Continuation -> { | |
val cont = tmp | |
_message = cont.message | |
return cont.action() | |
} | |
} | |
} | |
catch (x : Exception) { | |
logger.logError("Error handling SystemMessage {0}", x) | |
throw | |
} | |
} | |
override fun invokeUserMessageAsync (msg : Any) : Task { | |
var influenceTimeout : Boolean = true | |
if (receiveTimeout > TimeSpan.Zero) { | |
var notInfluenceTimeout : Boolean = msg is INotInfluenceReceiveTimeout | |
influenceTimeout = notInfluenceTimeout | |
if (influenceTimeout) { | |
stopReceiveTimeout() | |
} | |
} | |
var res : Task = processMessageAsync(msg) | |
if (receiveTimeout != TimeSpan.Zero && influenceTimeout) { | |
if (res.isCompleted) { | |
return res.continueWith{_ -> resetReceiveTimeout()} | |
} | |
resetReceiveTimeout() | |
} | |
return res | |
} | |
override fun escalateFailure (reason : Exception, message : Any) { | |
escalateFailure(reason, self) | |
} | |
private fun processMessageAsync (msg : Any) : Task { | |
_message = msg | |
return if (_receiveMiddleware != null) _receiveMiddleware(this) else defaultReceive(this) | |
} | |
private fun requestAsync (target : PID, message : Any, future : FutureProcess<T>) : Task<T> { | |
var messageEnvelope : MessageEnvelope = MessageEnvelope(message, future.pid, null) | |
sendUserMessage(target, messageEnvelope) | |
return future.task | |
} | |
private fun sendUserMessage (target : PID, message : Any) { | |
if (_senderMiddleware != null) { | |
if (messageMessageEnvelope) { | |
_senderMiddleware(this, target, messageEnvelope) | |
} else { | |
_senderMiddleware(this, target, MessageEnvelope(message, null, null)) | |
} | |
} else { | |
target.tell(message) | |
} | |
} | |
private fun incarnateActor () { | |
_state = ContextState.Alive | |
actor = _producer() | |
} | |
private fun handleRestartAsync () : Task { | |
_state = ContextState.Restarting | |
invokeUserMessageAsync(Restarting.Instance) | |
if (_children != null) { | |
for(child in _children) { | |
child.stop() | |
} | |
} | |
tryRestartOrTerminateAsync() | |
} | |
private fun handleUnwatch (uw : Unwatch) { | |
_watchersremove(uw.watcher) | |
} | |
private fun handleWatch (w : Watch) { | |
if (_state == ContextState.Stopping) { | |
w.watcher.sendSystemMessage(Terminated) | |
} else { | |
if (_watchers == null) { | |
_watchers = FastSet<PID>() | |
} | |
_watchers.add(w.watcher) | |
} | |
} | |
private fun handleFailure (msg : Failure) { | |
if (actorISupervisorStrategy) { | |
supervisor.handleFailure(this, msg.who, msg.restartStatistics, msg.reason) | |
return | |
} | |
_supervisorStrategy.handleFailure(this, msg.who, msg.restartStatistics, msg.reason) | |
} | |
private fun handleTerminatedAsync (msg : Terminated) : Task { | |
_childrenremove(msg.who) | |
invokeUserMessageAsync(msg) | |
tryRestartOrTerminateAsync() | |
} | |
private fun handleRootFailure (failure : Failure) { | |
Supervision.defaultStrategy.handleFailure(this, failure.who, failure.restartStatistics, failure.reason) | |
} | |
private fun handleStopAsync () : Task { | |
_state = ContextState.Stopping | |
invokeUserMessageAsync(Stopping.Instance) | |
if (_children != null) { | |
for(child in _children) { | |
child.stop() | |
} | |
} | |
tryRestartOrTerminateAsync() | |
} | |
private fun tryRestartOrTerminateAsync () : Task { | |
cancelReceiveTimeout() | |
if (_childrencount > 0) { | |
return | |
} | |
val tmp = _state | |
when (tmp) { | |
} | |
} | |
private fun stopAsync () : Task { | |
ProcessRegistry.instance.remove(self) | |
invokeUserMessageAsync(Stopped.Instance) | |
disposeActorIfDisposable() | |
if (_watchers != null) { | |
var terminated : Terminated = Terminated | |
for(watcher in _watchers) { | |
watcher.sendSystemMessage(terminated) | |
} | |
} | |
if (parent != null) { | |
var terminated : Terminated = Terminated | |
parent.sendSystemMessage(terminated) | |
} | |
} | |
private fun restartAsync () : Task { | |
disposeActorIfDisposable() | |
incarnateActor() | |
self.sendSystemMessage(ResumeMailbox.Instance) | |
invokeUserMessageAsync(Started.Instance) | |
if (_stash != null) { | |
while (()) { | |
var msg : Any = _stash.pop() | |
invokeUserMessageAsync(msg) | |
} | |
} | |
} | |
private fun disposeActorIfDisposable () { | |
if (actorIDisposable) { | |
disposableActor.dispose() | |
} | |
} | |
private fun resetReceiveTimeout () { | |
_receiveTimeoutTimerchange(receiveTimeout, receiveTimeout) | |
} | |
private fun stopReceiveTimeout () { | |
_receiveTimeoutTimerchange(1, 1) | |
} | |
private fun receiveTimeoutCallback (state : Any) { | |
self.request(Proto.receiveTimeout.Instance, null) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment