Skip to content

Instantly share code, notes, and snippets.

@rogeralsing
Created July 9, 2017 19:56
Show Gist options
  • Save rogeralsing/81b1ed80631b6426382fb5595c0c0e42 to your computer and use it in GitHub Desktop.
Save rogeralsing/81b1ed80631b6426382fb5595c0c0e42 to your computer and use it in GitHub Desktop.
package proto
open class LocalContext : IMessageInvoker, IContext, ISupervisor {
companion object {
private val logger : ILogger = Log.createLogger<LocalContext>()
internal fun defaultReceive (context : IContext) : Task {
var c : LocalContext = LocalContextcontext
if (c.message is PoisonPill) {
c.self.stop()
return Proto.actor.Done
}
return c.actor.receiveAsync(context)
}
internal fun defaultSender (context : ISenderContext, target : PID, envelope : MessageEnvelope) : Task {
target.ref.sendUserMessage(target, envelope)
return Task.fromResult(0)
}
}
val EmptyChildren : IReadOnlyCollection<PID> = List<PID>()
private val _producer : () -> IActor
private val _receiveMiddleware : (IContext) -> Task
private val _senderMiddleware : (ISenderContext, PID, MessageEnvelope) -> Task
private val _supervisorStrategy : ISupervisorStrategy
private var _children : FastSet<PID>? = null
private var _message : Any? = null
private var _receiveTimeoutTimer : Timer? = null
private var _restartStatistics : RestartStatistics? = null
private var _stash : Stack<Any>? = null
private var _state : ContextState? = null
private var _watchers : FastSet<PID>? = null
constructor(producer : () -> IActor, supervisorStrategy : ISupervisorStrategy, receiveMiddleware : (IContext) -> Task, senderMiddleware : (ISenderContext, PID, MessageEnvelope) -> Task, parent : PID) {
_producer = producer
_supervisorStrategy = supervisorStrategy
_receiveMiddleware = receiveMiddleware
_senderMiddleware = senderMiddleware
parent = parent
incarnateActor()
}
override val children : IReadOnlyCollection<PID>
get() = _childrentoList() ?? EmptyChildren
override var actor : IActor
override val parent : PID
override var self : PID
override val message : Any
override val sender : PID
get() = _message as MessageEnvelopesender
override val headers : MessageHeader
override var receiveTimeout : Duration
override fun stash () {
if (_stash == null) {
_stash = Stack<Any>()
}
_stash.push(message)
}
override fun respond (message : Any) {
sender.tell(message)
}
override fun spawn (props : Props) : PID {
var id : String = ProcessRegistry.instance.nextId()
return spawnNamed(props, id)
}
override fun spawnPrefix (props : Props, prefix : String) : PID {
var name : String = prefix + ProcessRegistry.instance.nextId()
return spawnNamed(props, name)
}
override fun spawnNamed (props : Props, name : String) : PID {
var pid : PID = props.spawn("${self.id}/${name}", self)
if (_children == null) {
_children = FastSet<PID>()
}
_children.add(pid)
return pid
}
override fun watch (pid : PID) {
pid.sendSystemMessage(Watch(self))
}
override fun unwatch (pid : PID) {
pid.sendSystemMessage(Unwatch(self))
}
override fun setReceiveTimeout (duration : Duration) {
if (duration <= TimeSpan.Zero) {
throw ArgumentOutOfRangeException(nameof(duration), duration, "Duration must be greater than zero")
}
if (duration == receiveTimeout) {
return
}
stopReceiveTimeout()
receiveTimeout = duration
if (_receiveTimeoutTimer == null) {
_receiveTimeoutTimer = Timer(receiveTimeoutCallback, null, receiveTimeout, receiveTimeout)
} else {
resetReceiveTimeout()
}
}
override fun cancelReceiveTimeout () {
if (_receiveTimeoutTimer == null) {
return
}
stopReceiveTimeout()
_receiveTimeoutTimer = null
receiveTimeout = TimeSpan.Zero
}
override fun receiveAsync (message : Any) : Task {
return processMessageAsync(message)
}
override fun tell (target : PID, message : Any) {
sendUserMessage(target, message)
}
override fun request (target : PID, message : Any) {
var messageEnvelope : MessageEnvelope = MessageEnvelope(message, self, null)
sendUserMessage(target, messageEnvelope)
}
override fun requestAsync (target : PID, message : Any, timeout : Duration) : Task<T> = requestAsync(target, message, FutureProcess<T>(timeout))
override fun requestAsync (target : PID, message : Any, cancellationToken : CancellationToken) : Task<T> = requestAsync(target, message, FutureProcess<T>(cancellationToken))
override fun requestAsync (target : PID, message : Any) : Task<T> = requestAsync(target, message, FutureProcess<T>())
override fun reenterAfter (target : Task<T>, action : (Task<T>) -> Task) {
var msg : Any = _message
var cont : Continuation = Continuation({ -> action(target)}, msg)
target.continueWith{t ->
self.sendSystemMessage(cont)
}
}
override fun escalateFailure (reason : Exception, who : PID) {
if (_restartStatistics == null) {
_restartStatistics = RestartStatistics(0, null)
}
var failure : Failure = Failure(who, reason, _restartStatistics)
if (parent == null) {
handleRootFailure(failure)
} else {
self.sendSystemMessage(SuspendMailbox.Instance)
parent.sendSystemMessage(failure)
}
}
override fun restartChildren (reason : Exception, pids : arrayOf<PID>) {
for(pid in pids) {
pid.sendSystemMessage(Restart(reason))
}
}
override fun stopChildren (pids : arrayOf<PID>) {
for(pid in pids) {
pid.sendSystemMessage(Stop.Instance)
}
}
override fun resumeChildren (pids : arrayOf<PID>) {
for(pid in pids) {
pid.sendSystemMessage(ResumeMailbox.Instance)
}
}
override fun invokeSystemMessageAsync (msg : Any) : Task {
try {
val tmp = msg
when (tmp) {
is Started -> {
val s = tmp
return invokeUserMessageAsync(s)
}
is Stop -> {
return handleStopAsync()
}
is Terminated -> {
val t = tmp
return handleTerminatedAsync(t)
}
is Watch -> {
val w = tmp
handleWatch(w)
return Task.fromResult(0)
}
is Unwatch -> {
val uw = tmp
handleUnwatch(uw)
return Task.fromResult(0)
}
is Failure -> {
val f = tmp
handleFailure(f)
return Task.fromResult(0)
}
is Restart -> {
return handleRestartAsync()
}
is SuspendMailbox -> {
return Task.fromResult(0)
}
is ResumeMailbox -> {
return Task.fromResult(0)
}
is Continuation -> {
val cont = tmp
_message = cont.message
return cont.action()
}
}
}
catch (x : Exception) {
logger.logError("Error handling SystemMessage {0}", x)
throw
}
}
override fun invokeUserMessageAsync (msg : Any) : Task {
var influenceTimeout : Boolean = true
if (receiveTimeout > TimeSpan.Zero) {
var notInfluenceTimeout : Boolean = msg is INotInfluenceReceiveTimeout
influenceTimeout = notInfluenceTimeout
if (influenceTimeout) {
stopReceiveTimeout()
}
}
var res : Task = processMessageAsync(msg)
if (receiveTimeout != TimeSpan.Zero && influenceTimeout) {
if (res.isCompleted) {
return res.continueWith{_ -> resetReceiveTimeout()}
}
resetReceiveTimeout()
}
return res
}
override fun escalateFailure (reason : Exception, message : Any) {
escalateFailure(reason, self)
}
private fun processMessageAsync (msg : Any) : Task {
_message = msg
return if (_receiveMiddleware != null) _receiveMiddleware(this) else defaultReceive(this)
}
private fun requestAsync (target : PID, message : Any, future : FutureProcess<T>) : Task<T> {
var messageEnvelope : MessageEnvelope = MessageEnvelope(message, future.pid, null)
sendUserMessage(target, messageEnvelope)
return future.task
}
private fun sendUserMessage (target : PID, message : Any) {
if (_senderMiddleware != null) {
if (messageMessageEnvelope) {
_senderMiddleware(this, target, messageEnvelope)
} else {
_senderMiddleware(this, target, MessageEnvelope(message, null, null))
}
} else {
target.tell(message)
}
}
private fun incarnateActor () {
_state = ContextState.Alive
actor = _producer()
}
private fun handleRestartAsync () : Task {
_state = ContextState.Restarting
invokeUserMessageAsync(Restarting.Instance)
if (_children != null) {
for(child in _children) {
child.stop()
}
}
tryRestartOrTerminateAsync()
}
private fun handleUnwatch (uw : Unwatch) {
_watchersremove(uw.watcher)
}
private fun handleWatch (w : Watch) {
if (_state == ContextState.Stopping) {
w.watcher.sendSystemMessage(Terminated)
} else {
if (_watchers == null) {
_watchers = FastSet<PID>()
}
_watchers.add(w.watcher)
}
}
private fun handleFailure (msg : Failure) {
if (actorISupervisorStrategy) {
supervisor.handleFailure(this, msg.who, msg.restartStatistics, msg.reason)
return
}
_supervisorStrategy.handleFailure(this, msg.who, msg.restartStatistics, msg.reason)
}
private fun handleTerminatedAsync (msg : Terminated) : Task {
_childrenremove(msg.who)
invokeUserMessageAsync(msg)
tryRestartOrTerminateAsync()
}
private fun handleRootFailure (failure : Failure) {
Supervision.defaultStrategy.handleFailure(this, failure.who, failure.restartStatistics, failure.reason)
}
private fun handleStopAsync () : Task {
_state = ContextState.Stopping
invokeUserMessageAsync(Stopping.Instance)
if (_children != null) {
for(child in _children) {
child.stop()
}
}
tryRestartOrTerminateAsync()
}
private fun tryRestartOrTerminateAsync () : Task {
cancelReceiveTimeout()
if (_childrencount > 0) {
return
}
val tmp = _state
when (tmp) {
}
}
private fun stopAsync () : Task {
ProcessRegistry.instance.remove(self)
invokeUserMessageAsync(Stopped.Instance)
disposeActorIfDisposable()
if (_watchers != null) {
var terminated : Terminated = Terminated
for(watcher in _watchers) {
watcher.sendSystemMessage(terminated)
}
}
if (parent != null) {
var terminated : Terminated = Terminated
parent.sendSystemMessage(terminated)
}
}
private fun restartAsync () : Task {
disposeActorIfDisposable()
incarnateActor()
self.sendSystemMessage(ResumeMailbox.Instance)
invokeUserMessageAsync(Started.Instance)
if (_stash != null) {
while (()) {
var msg : Any = _stash.pop()
invokeUserMessageAsync(msg)
}
}
}
private fun disposeActorIfDisposable () {
if (actorIDisposable) {
disposableActor.dispose()
}
}
private fun resetReceiveTimeout () {
_receiveTimeoutTimerchange(receiveTimeout, receiveTimeout)
}
private fun stopReceiveTimeout () {
_receiveTimeoutTimerchange(1, 1)
}
private fun receiveTimeoutCallback (state : Any) {
self.request(Proto.receiveTimeout.Instance, null)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment