Skip to content

Instantly share code, notes, and snippets.

@adamw
Created June 10, 2018 18:51
Show Gist options
  • Save adamw/1718511c558f2d0397db4f39624c18e4 to your computer and use it in GitHub Desktop.
Save adamw/1718511c558f2d0397db4f39624c18e4 to your computer and use it in GitHub Desktop.
def consumeQueueBehavior(queue: Queue[Future],
msgSink: ActorRef[Received]): Behavior[Try[String]] =
Behaviors.setup { ctx =>
import ctx.executionContext
ctx.log.info("[queue] receiving message")
queue.read().andThen { case result => ctx.self ! result }
Behaviors
.receiveMessage[Try[String]] {
case Success(msg) =>
msgSink ! Received(msg)
consumeQueueBehavior(queue, msgSink)
case Failure(e) =>
ctx.log.info(s"[queue] failure: ${e.getMessage}")
throw e
}
.receiveSignal {
case (_, PostStop) =>
ctx.log.info("[queue-stop] closing")
Await.result(queue.close(), 1.minute)
ctx.log.info("[queue-stop] closed")
Behaviors.same
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment