This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def connectToQueueBehavior(connector: QueueConnector[Future], | |
msgSink: ActorRef[Received]): Behavior[Nothing] = { | |
Behaviors.setup[Try[Queue[Future]]] { ctx => | |
import ctx.executionContext | |
ctx.log.info("[queue-start] connecting") | |
connector.connect.andThen { case result => ctx.self ! result } | |
Behaviors.receiveMessage { | |
case Success(queue) => |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def connectToQueueBehavior(connector: QueueConnector[Future], | |
msgSink: ActorRef[Received]): Behavior[Nothing] = { | |
Behaviors.setup[Try[Queue[Future]]] { ctx => | |
import ctx.executionContext | |
ctx.log.info("[queue-start] connecting") | |
connector.connect.andThen { case result => ctx.self ! result } | |
Behaviors.receiveMessage { | |
case Success(queue) => |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def broadcastBehavior( | |
connector: QueueConnector[Future]): Behavior[BroadcastActorMessage] = | |
Behaviors.setup { ctx => | |
val connectBehavior = Behaviors | |
.supervise[Nothing](connectToQueueBehavior(connector, ctx.self)) | |
.onFailure[RuntimeException](SupervisorStrategy.restart) | |
ctx.spawn[Nothing](connectBehavior, "connect-queue") | |
def handleMessage( | |
consumers: Set[ActorRef[String]]): Behavior[BroadcastActorMessage] = // ... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def handleMessage(consumers: Set[ActorRef[String]]): Behavior[BroadcastActorMessage] = | |
Behaviors.receiveMessage { | |
case Subscribe(actor) => handleMessage(consumers + actor) | |
case Received(msg) => | |
consumers.foreach(_ ! msg) | |
handleMessage(consumers) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sealed trait BroadcastActorMessage | |
case class Subscribe(actor: ActorRef[String]) extends BroadcastActorMessage | |
case class Received(msg: String) extends BroadcastActorMessage |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ConsumeQueueActor(connector: QueueConnector[Future]) | |
extends Actor with ActorLogging { | |
import context.dispatcher | |
private var currentQueue: Option[Queue[Future]] = None | |
override def preStart(): Unit = // ... | |
override def postStop(): Unit = { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BroadcastActor(connector: QueueConnector[Future]) | |
extends Actor with ActorLogging { | |
// ... | |
// optional - the default one is identical | |
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { | |
case _: ActorInitializationException => Stop | |
case _: ActorKilledException => Stop | |
case _: DeathPactException => Stop |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ConsumeQueueActor(connector: QueueConnector[Future]) | |
extends Actor with ActorLogging { | |
import context.dispatcher | |
private var currentQueue: Option[Queue[Future]] = None | |
// ... | |
override def receive: Receive = { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ConsumeQueueActor(connector: QueueConnector[Future]) | |
extends Actor with ActorLogging { | |
import context.dispatcher | |
private var currentQueue: Option[Queue[Future]] = None | |
override def preStart(): Unit = { | |
log.info("[queue-start] connecting") | |
connector.connect.pipeTo(self) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BroadcastActor(connector: QueueConnector[Future]) | |
extends Actor with ActorLogging { | |
override def preStart(): Unit = { | |
context.actorOf(Props(new ConsumeQueueActor(connector))) | |
} | |
// ... | |
} |