Last active
December 21, 2015 00:29
-
-
Save helena/6220788 to your computer and use it in GitHub Desktop.
Simple (and truncated) example of the CloudExtension's load-time ordered provisioning and ordered graceful shutdown.
Unfortunately this had to be written in an older version of scala and akka - for now. MyNodeGuardian.scala is started in CloudExtension.register() and is a sample of using ProvisioningGuardian which extends OrderedGracefulShutdown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* CloudExtension and factory for creating CloudExtension instances. | |
* Example: | |
* {{{ | |
* val application = CloudExtension(system, config) | |
* }}} | |
* | |
* @author Helena Edelson | |
*/ | |
object CloudExtension extends ExtensionId[CloudExtension] with ExtensionIdProvider { | |
/** | |
* Obtains the instance associated with `system`, registers `config` | |
* settings at that instance and returns it. | |
* | |
* @param config configurations and settings to register. | |
*/ | |
def apply(system: ActorSystem, config: CloudConfig): CloudExtension = { | |
val extension = super.apply(system) | |
extension.register(config) | |
extension | |
} | |
override def lookup: ExtensionId[_ <: Extension] = CloudExtension | |
override def createExtension(system: ExtendedActorSystem): CloudExtension = new CloudExtension(system) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* INTERNAL API. | |
* | |
* Supervisor managing the different Cluster nodes. | |
* | |
* @author Helena Edelson | |
*/ | |
private[foo] final class MyNodeGuardian(config: CloudConfig, val selfAddress: Address, uuid: UUID) extends ProvisioningGuardian { | |
import config._ | |
import settings._ | |
var ordered: IndexedSeq[ActorRef] = IndexedSeq.empty | |
// any number of framework actors which support node instances | |
var actor1: Option[ActorRef] = None | |
var actor2: Option[ActorRef] = None | |
/** | |
* Blocks the thread in load-time for sequential provisioning. | |
* Initialized by cloud extension's register function - see CloudExtension line 20. | |
*/ | |
def provision(sender: ActorRef): Unit = { | |
implicit val timeout = BootProvisionTimeout | |
// each framework actor does a specific job that can be enabled/disabled via configuration | |
// so for each... | |
if (SpecificFunctionalityEnabled) { | |
actor1 = Some(context.actorOf(Props(new SomeFrameworkActor(config, selfAddress, uuid)), "some-actor")) | |
Await.result((actor.get ? Initialized).mapTo[InitializedAck], timeout.duration) | |
} | |
ordered ++= IndexedSeq(actor1, actor2, etc...).flatten | |
self ! InternalCoreAction.Provisioned | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.util.duration._ | |
import akka.actor._ | |
import akka.dispatch.Await | |
import akka.pattern.gracefulStop | |
import akka.actor. { SupervisorStrategy, OneForOneStrategy } | |
/** | |
* Handles graceful shutdown of all `ordered` actors. | |
* GracefulShutdownException, Supervision, settings are all custom. | |
*/ | |
trait OrderedGracefulShutdown extends Actor with Supervision with ActorLogging { | |
import settings._ | |
import context._ | |
def ordered: IndexedSeq[ActorRef] | |
override val supervisorStrategy = guardianSupervisorStrategy | |
override def postStop(): Unit = { | |
log.info("Ordered graceful shutdown of children starting") | |
ordered foreach (gracefulShutdown(_)) | |
log.info("Ordered graceful shutdown of children completed") | |
} | |
/** | |
* Executes [[akka.pattern.gracefulStop( )]] on `child`. | |
* Blocks the thread for orderly shutdown. | |
*/ | |
def gracefulShutdown(child: ActorRef): Unit = try { | |
log.debug("Graceful stop starting for {}", child.path) | |
Await.result(gracefulStop(child, settings.GracefulShutdownDuration), settings.GracefulShutdownDuration + 1.seconds) | |
log.debug("Graceful stop completed for {}", child.path) | |
} catch { case e => | |
log.error("Error shutting down {}, cause {}", child.path, cause.toString) | |
throw new GracefulShutdownException(cause) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Handles load-time provisioning | |
*/ | |
trait ProvisioningGuardian extends OrderedGracefulShutdown with Supervision { | |
import context._ | |
def selfAddress: Address | |
override def preStart(): Unit = self ! InternalCoreAction.Provision | |
/** | |
* Executes graceful shutdown by dependency hierarchy. | |
*/ | |
override def postStop(): Unit = { | |
log.info("Node stopping on [{}]", selfAddress) | |
self ! InternalCoreAction.Deprovision | |
super.postStop() | |
} | |
override def preRestart(cause: Throwable, message: Option[Any]): Unit = { | |
log.error("Node failure on {} due to {} : {}", selfAddress, cause, message) | |
super.preRestart(cause, message) | |
} | |
def provisioning: Receive = { | |
case InternalCoreAction.Provisioned => unbecome() | |
} | |
def receive: Receive = { | |
case InternalCoreAction.Provision => | |
log.info("Provisioning on {}", selfAddress) | |
become(provisioning); provision(sender) // work in progress to be just: become(provisioning) | |
case InternalCoreAction.Deprovision => | |
log.info("Deprovisioning on {}", selfAddress) | |
deprovision(sender) | |
} | |
/** | |
* Blocks the thread in load-time for sequential provisioning. | |
*/ | |
def provision(sender: ActorRef): Unit | |
/** | |
* Executes graceful shutdown by dependency hierarchy. | |
*/ | |
def deprovision(sender: ActorRef): Unit = { | |
ordered foreach (gracefulShutdown(_)) | |
log.info("Deprovisioned on {}", selfAddress) | |
sender ! InternalCoreAction.DeprovisionedAck(self) | |
self ! PoisonPill | |
context.system.shutdown() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The Await is during load-time and shutdown for ordered actor provisioning/deprovisioning. For example, where all role actors (non-framework) need to use services such as naming and dynamic locations, where actor1 above is naming, actor2 is locations. Naming and Locations actors have workers, all must be up and running before any non-framework role actors come online and start their work. As each supervisor completes provisioning they send back the ack. Then all role actors can start.
Await only occurs during node startup/shutdown.