Created
May 22, 2013 08:26
-
-
Save drexin/5626071 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This actor receives AMQP messages from the connection actor | |
// and forwards them to the receiver (in my case a channel) | |
class Forwarder(receiver: ActorRef) extends Actor { | |
def receive = { | |
case AMQPMessage(deliveryTag, key, data) => | |
val connectionActor = sender | |
receiver ! Message( | |
event = data, | |
ack = true, | |
// here I expect the Ack(deliveryTag) to be sent | |
// to the connectionActor, after the channel | |
// persisted it | |
posConfirmationTarget = connectionActor, | |
posConfirmationMessage = Ack(deliveryTag) | |
) | |
} | |
} | |
// connectionActor manages the connection to the amqp broker and forwards messages | |
// to the subscriber and waits for acks | |
val connectionActor = system.actorOf(Props(classOf[ConnectionActor], amqpConfig)) | |
val processor = system.actorOf(Props[Processor]) | |
val channel = esExtension.channelOf(ReliableChannelProps(1, processor)) | |
// forwarder will forward messages to the channel | |
val forwarder = system.actorOf(Props(classOf[Forwarder], channel)) | |
connectionActor ! Subscribe(forwarder) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment