Created
October 31, 2015 08:27
-
-
Save josdirksen/77e59d236c637d46ab32 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.typed.patterns.Receiver._ | |
import akka.typed._ | |
import akka.typed.patterns.Receiver | |
import akka.typed.ScalaDSL._ | |
import scala.concurrent.duration._ | |
object AkkaTypedReceiver extends App { | |
sealed trait HelloMsg | |
final case class HelloCountry(country: String) extends HelloMsg | |
final case class HelloCity(city: String) extends HelloMsg | |
final case class HelloWorld() extends HelloMsg | |
final case class Hello(msg: String) extends HelloMsg | |
final case class registerReceiverCmdIn(cmdIn: ActorRef[Command[HelloMsg]]) extends HelloMsg | |
final case class GetAllMessages() extends HelloMsg | |
final case class PrintMessages(msgs: Seq[HelloMsg]) extends HelloMsg | |
object Consumer { | |
val consumer = Total[HelloMsg] { | |
// in the case of a registerReceiver message, we change the implementation | |
// since we're ready to receive other message. | |
case registerReceiverCmdIn(commandAddress) => { | |
println("Consumer: Switching behavior") | |
// return a static implementation which closes over actorRefs | |
// all messages we receive we pass to the receiver, which will queue | |
// them. We have a specific message that prints out the received messages | |
ContextAware { ctx => | |
Static[HelloMsg] { | |
// printmessages just prints out the list of messages we've received | |
case PrintMessages(msgs) => println(s"Consumer: Printing messages: $msgs") ;msgs.foreach { hw => println(s" $hw")} | |
// if we get the getAllMessages request, we get all the messages from | |
// the receiver. | |
case GetAllMessages() => { | |
println("Consumer: requesting all messages") | |
val wrap = ctx.spawnAdapter[GetAllResult[HelloMsg]] { | |
case msgs:GetAllResult[HelloMsg] => println(s"Consumer: Received ${msgs.msgs.length} messages"); PrintMessages(msgs.msgs) | |
} | |
commandAddress ! GetAll(2 seconds)(wrap) | |
} | |
} | |
} | |
} | |
// for all the other cases return the existing implementation, in essence | |
// we're just ignoring other messages till we change state | |
case _ => Same | |
} | |
} | |
/** | |
* Producer object containing the protocol and the behavior. This is a very simple | |
* actor that produces messages using a schedule. To start producing messages | |
* we need to send an initial message | |
*/ | |
object Producer { | |
// a simple protocol defining the messages that can be sent | |
sealed trait ProducerMsg | |
final case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsg | |
final case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg | |
// the producer, which first waits for a registerReceiver message, after which | |
// it changes behavior, to send messages. | |
val producer = Full[ProducerMsg] { | |
// if we receive a register message, we know where to send messages to | |
case Msg(ctx, registerReceiverMsgIn(msgConsumer)) => | |
println("Producer: Switching behavior") | |
// simple helper function which sends a message to self. | |
def scheduleMessage() = ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(s"hello @ ${System.currentTimeMillis()}"))) | |
// schedule the first one, the rest will be triggered through the behavior. | |
scheduleMessage() | |
Static { | |
// add a message to the receiver and schedule a new one | |
case addHelloWorldMsg(msg) => {println(s"Producer: Adding new '$msg' to receiver: $msgConsumer") ;msgConsumer ! msg; scheduleMessage()} | |
} | |
// don't switch behavior on any of the other messages | |
case _ => Same | |
} | |
} | |
// Simple root actor, which we'll use to start the other actors | |
val scenario1 = { | |
Full[Unit] { | |
case Sig(ctx, PreStart) => { | |
import Producer._ | |
import Consumer._ | |
println("Scenario1: Started, now lets start up a number of child actors to do our stuff") | |
// first start the two actors, one implements the receiver pattern, and | |
// the other is the one we control directly. | |
val receiverActor = ctx.spawn(Props(Receiver.behavior[HelloMsg]), "receiver") | |
val consumerActor = ctx.spawn(Props(consumer), "adder") | |
val producerActor = ctx.spawn(Props(producer), "producer") | |
// our producerActor first needs the actorRef it can use to add messages to the receiver | |
// for this we use a wrapper, this wrapper creates a child, which we use to get the | |
// address, to which we can send messages. | |
val wrapper = ctx.spawnAdapter[ActorRef[HelloMsg]] { | |
case p: ActorRef[HelloMsg] => producerActor ! registerReceiverMsgIn(p) | |
} | |
// now send the message to get the external address, the response will be sent | |
// to our own actor as a registerReceiver message, through the adapter | |
receiverActor ! ExternalAddress(wrapper) | |
// our printing actor needs to now the address of the receiver so send it to him | |
consumerActor ! registerReceiverCmdIn(receiverActor) | |
// by calling getAllMessages we get the messages within a time period. | |
println("Scenario1: Get all the messages") | |
consumerActor ! GetAllMessages() | |
Thread.sleep(3000) | |
consumerActor ! GetAllMessages() | |
Thread.sleep(5000) | |
consumerActor ! GetAllMessages() | |
Same | |
} | |
} | |
} | |
val scenario1Actor = ActorSystem("Root", Props(scenario1)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment