Skip to content

Instantly share code, notes, and snippets.

@adamw
Created June 10, 2018 18:45
Show Gist options
  • Save adamw/b90b518d3db0ca596b603c8f1fc8bc8b to your computer and use it in GitHub Desktop.
Save adamw/b90b518d3db0ca596b603c8f1fc8bc8b to your computer and use it in GitHub Desktop.
def connectToQueueBehavior(connector: QueueConnector[Future],
msgSink: ActorRef[Received]): Behavior[Nothing] = {
Behaviors.setup[Try[Queue[Future]]] { ctx =>
import ctx.executionContext
ctx.log.info("[queue-start] connecting")
connector.connect.andThen { case result => ctx.self ! result }
Behaviors.receiveMessage {
case Success(queue) =>
ctx.log.info("[queue-start] connected")
val consumeActor = ctx.spawn(consumeQueueBehavior(queue, msgSink),
"consume-queue")
ctx.watch(consumeActor)
// we can either not handle Terminated, which will cause
// DeathPactException to be thrown and propagated or rethrow the
// original exception
Behaviors.receiveSignal {
case (_, t @ Terminated(_)) =>
t.failure.foreach(throw _)
Behaviors.empty
}
case Failure(e) =>
ctx.log.info("[queue-start] failure")
throw e
}
}
}.narrow[Nothing]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment