Created
January 26, 2011 17:16
-
-
Save viktorklang/797035 to your computer and use it in GitHub Desktop.
Unnesting a Scala Actors nested receive example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* I had a question directed at me, on how to encode the following scenario in Akka Actors, | |
* as for Scala Actors one would simply nest the receives. | |
* | |
* Recuirements are as follows: | |
* The first thing the actor needs to do, is to subscribe to a channel of events, | |
* Then it must replay (process) all "old" events | |
* Then it has to wait for a GoAhead signal to begin processing the new events | |
* It mustn't "miss" events that happen between catching up with the old events and getting the GoAhead signal | |
*/ | |
import akka.actor._ | |
import akka.actor.Actor._ | |
import scala.collection.mutable.ListBuffer | |
class MyActor extends Actor { | |
//If you need to store sender/senderFuture you can change it to ListBuffer[(Any, Channel)] | |
val queue = new ListBuffer[Any]() | |
//This message processes a message/event | |
def process(msg: Any): Unit = println("processing: " + msg) | |
//This method subscribes the actor to the event bus | |
def subscribe() {} //Your external stuff | |
//This method retrieves all prior messages/events | |
def allOldMessages() = List() | |
override def preStart { | |
//We override preStart to be sure that the first message the actor gets is | |
//'Replay, that message will start to be processed _after_ the actor is started | |
self ! 'Replay | |
//Then we subscribe to the stream of messages/events | |
subscribe() | |
} | |
def receive = { | |
case 'Replay => //Our first message should be a 'Replay message, all others are invalid | |
allOldMessages() foreach process //Process all old messages/events | |
become { //Switch behavior to look for the GoAhead signal | |
case 'GoAhead => //When we get the GoAhead signal we process all our buffered messages/events | |
queue foreach process | |
queue.clear | |
become { //Then we change behaviour to process incoming messages/events as they arrive | |
case msg => process(msg) | |
} | |
case msg => //While we haven't gotten the GoAhead signal, buffer all incoming messages | |
queue += msg //Here you have full control, you can handle overflow etc | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment