Skip to content

Instantly share code, notes, and snippets.

@adamw
Created June 10, 2018 13:52
Show Gist options
  • Save adamw/552501a6f4b10271facc130373c57709 to your computer and use it in GitHub Desktop.
Save adamw/552501a6f4b10271facc130373c57709 to your computer and use it in GitHub Desktop.
class ConsumeQueueActor(connector: QueueConnector[Future])
extends Actor with ActorLogging {
import context.dispatcher
private var currentQueue: Option[Queue[Future]] = None
// ...
override def receive: Receive = {
case queue: Queue[Future] =>
if (currentQueue.isEmpty) {
log.info("[queue-start] connected")
currentQueue = Some(queue)
}
log.info("[queue] receiving message")
queue
.read()
.pipeTo(self) // forward message to self
.andThen { case Success(_) => self ! queue } // receive next message
case msg: String =>
context.parent ! Received(msg)
case Failure(e) =>
log.info(s"[queue] failure: ${e.getMessage}")
throw e
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment