Skip to content

Instantly share code, notes, and snippets.

View adamw's full-sized avatar

Adam Warski adamw

View GitHub Profile
def connectToQueueBehavior(connector: QueueConnector[Future],
msgSink: ActorRef[Received]): Behavior[Nothing] = {
Behaviors.setup[Try[Queue[Future]]] { ctx =>
import ctx.executionContext
ctx.log.info("[queue-start] connecting")
connector.connect.andThen { case result => ctx.self ! result }
Behaviors.receiveMessage {
case Success(queue) =>
def connectToQueueBehavior(connector: QueueConnector[Future],
msgSink: ActorRef[Received]): Behavior[Nothing] = {
Behaviors.setup[Try[Queue[Future]]] { ctx =>
import ctx.executionContext
ctx.log.info("[queue-start] connecting")
connector.connect.andThen { case result => ctx.self ! result }
Behaviors.receiveMessage {
case Success(queue) =>
def broadcastBehavior(
connector: QueueConnector[Future]): Behavior[BroadcastActorMessage] =
Behaviors.setup { ctx =>
val connectBehavior = Behaviors
.supervise[Nothing](connectToQueueBehavior(connector, ctx.self))
.onFailure[RuntimeException](SupervisorStrategy.restart)
ctx.spawn[Nothing](connectBehavior, "connect-queue")
def handleMessage(
consumers: Set[ActorRef[String]]): Behavior[BroadcastActorMessage] = // ...
def handleMessage(consumers: Set[ActorRef[String]]): Behavior[BroadcastActorMessage] =
Behaviors.receiveMessage {
case Subscribe(actor) => handleMessage(consumers + actor)
case Received(msg) =>
consumers.foreach(_ ! msg)
handleMessage(consumers)
}
sealed trait BroadcastActorMessage
case class Subscribe(actor: ActorRef[String]) extends BroadcastActorMessage
case class Received(msg: String) extends BroadcastActorMessage
class ConsumeQueueActor(connector: QueueConnector[Future])
extends Actor with ActorLogging {
import context.dispatcher
private var currentQueue: Option[Queue[Future]] = None
override def preStart(): Unit = // ...
override def postStop(): Unit = {
class BroadcastActor(connector: QueueConnector[Future])
extends Actor with ActorLogging {
// ...
// optional - the default one is identical
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _: ActorInitializationException => Stop
case _: ActorKilledException => Stop
case _: DeathPactException => Stop
class ConsumeQueueActor(connector: QueueConnector[Future])
extends Actor with ActorLogging {
import context.dispatcher
private var currentQueue: Option[Queue[Future]] = None
// ...
override def receive: Receive = {
class ConsumeQueueActor(connector: QueueConnector[Future])
extends Actor with ActorLogging {
import context.dispatcher
private var currentQueue: Option[Queue[Future]] = None
override def preStart(): Unit = {
log.info("[queue-start] connecting")
connector.connect.pipeTo(self)
class BroadcastActor(connector: QueueConnector[Future])
extends Actor with ActorLogging {
override def preStart(): Unit = {
context.actorOf(Props(new ConsumeQueueActor(connector)))
}
// ...
}