Last active
December 21, 2015 04:39
-
-
Save helena/6250995 to your computer and use it in GitHub Desktop.
Rough, initial cut of a trait to mixin when an Actor requires initalization, where the initialization is long and arduous (for example, data initialization related). This strategy allows the implementing actor to delegate the work to another Actor, on a separate, dedicated Dispatcher, and not block any other related Actors in load-time.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
###################################### | |
# Cloud Extension - Roles Config # | |
###################################### | |
cloud { | |
roles { | |
some-role { | |
# config for the role deployed on n-vms in the cluster | |
... | |
# dedicated dispatcher for provisioning required by someactor | |
someactor-provisioning-dispatcher { | |
type = Dispatcher | |
executor = "fork-join-executor" | |
# etc.. | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.immutable | |
import scala.collection.immutable.Queue | |
import akka.actor._ | |
/** | |
* Rough, initial cut of a trait to mixin when an Actor requires | |
* initalization, where the initialization is long and arduous (for | |
* example, data initialization related). This strategy allows the | |
* implementing actor to delegate the work to another Actor, on a | |
* separate, dedicated Dispatcher, and not block any other related | |
* Actors in load-time. | |
* | |
* @author Helena Edelson | |
*/ | |
trait ProvisioningActor extends Actor with ActorLogging { | |
/** | |
* The configured name of the dedicated dispatcher to use. | |
*/ | |
def provisioningDispatcher: String | |
/** | |
* The function to execute on `provisioningDispatcher` | |
* in the AsyncProvisioner actor. | |
*/ | |
protected def action(): Unit // TODO the right way | |
lazy val provisioner = context.actorOf(Props(new AsyncProvisioner(action())).withDispatcher(provisioningDispatcher)) | |
protected var queue: immutable.Queue[QueuedMessage] = Queue.empty | |
override def preStart(): Unit = becomeProvisioned() | |
def uninitialized: Actor.Receive = { | |
case e => log.debug("Received {} while uninitialized", e) | |
} | |
def receive = uninitialized | |
def becomeProvisioned(): Unit = { | |
context become provisioning | |
provisioner ! UserProvisioningAction.Initialized | |
} | |
/** | |
* Override case e: Any to restrict messages queued by type. | |
* Default enqueues all messages while provisioning. | |
*/ | |
def provisioning: Receive = { | |
case UserProvisioningAction.InitializedAck(a) => becomeInitialized() | |
case e: Any => enqueue(QueuedMessage(sender, e)) | |
} | |
def becomeInitialized(): Unit = { | |
context become initialized | |
dequeue() | |
} | |
/** | |
* Implement desired behavior. | |
*/ | |
def initialized: Actor.Receive | |
/** | |
* Queues applicable messages until store is provisioned. | |
*/ | |
def enqueue(m: QueuedMessage): Unit = queue = queue.enqueue(m) | |
def dequeue(): Unit = { | |
queue foreach (m => self.tell(m.message, m.sender)) | |
queue = Queue.empty // no need to dequeue, send all then set to empty | |
} | |
} | |
// TODO correct declaration of 'action | |
class AsyncProvisioner(action: Unit) extends Actor with ActorLogging { | |
def receive: Receive = { | |
case UserProvisioningAction.Initialized => provision() | |
} | |
/** | |
* Wraps a blocking function in a non-blocking context, on another dispatcher. | |
*/ | |
def provision(): Unit = { | |
action | |
context.parent ! UserProvisioningAction.InitializedAck(self) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ProvisioningActorSpec extends AbstractSpec { | |
"ProvisioningActor" must { | |
"provision the implementing actor" in { | |
val actor = system.actorOf(Props[MyProvisioningActor], "test-actor") | |
actor ! Ping // provisioning, enqueue 1 | |
expectMsgPF(1 second, "count") { case count: Int => count must be (1) } | |
Thread.sleep(5.seconds.toMillis) | |
// initialized | |
(0 to 10) foreach { n => | |
actor ! Pong | |
expectMsgPF(1 second, "Pong") { case Pong => println("received pong")} | |
expectMsgPF(1 second, "count") { case count: Int => count must be (0) } // queue empty | |
} | |
} | |
} | |
} | |
class MyProvisioningActor extends ProvisioningActor with ActorLogging { | |
/** | |
* Your configured dispatcher. | |
*/ | |
val provisioningDispatcher = "cloud.roles.some-role.someactor-provisioning-dispatcher" | |
def action(): Unit = { | |
// simulate some work that would really take several (painful) minutes | |
Thread.sleep(5.seconds.toMillis) | |
log.info("function completed") | |
} | |
/** | |
* While provisioning, override to enqueue any message type(s) of interest, | |
* default enqueues all types. | |
*/ | |
override def provisioning: Receive = { | |
case UserProvisioningAction.InitializedAck(a) => becomeInitialized() | |
case Ping => enqueue(QueuedMessage(sender, Pong)) | |
} | |
/** | |
* Override to add behavior | |
*/ | |
override def enqueue(m: QueuedMessage): Unit = { | |
super.enqueue(m) | |
m.sender ! queue.size | |
} | |
/** | |
* Implement the standard behavior of your Actor. | |
*/ | |
def initialized: Actor.Receive = { | |
case Pong => sender ! Pong; sender ! queue.size | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment