Created
January 22, 2015 11:26
-
-
Save ashic/b5edc7cfdc85aa60b066 to your computer and use it in GitHub Desktop.
Rabbitmq receiver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class RmqReceiver(uname:String, | |
| passwd: String, | |
| vHost:String, | |
| host:String, | |
| qName:String, | |
| topic:Option[String]) | |
| extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Logging { | |
| def onStart() { | |
| // Start the thread that receives data over a connection | |
| new Thread("Socket Receiver") { | |
| override def run() { receive() } | |
| }.start() | |
| } | |
| private def receive(): Unit ={ | |
| val factory = new ConnectionFactory() | |
| factory.setUsername(uname) | |
| factory.setPassword(passwd) | |
| factory.setVirtualHost(vHost) | |
| factory.setHost(host) | |
| try { | |
| val connection = factory.newConnection() | |
| val channel = connection.createChannel() | |
| val consumer = new QueueingConsumer(channel) | |
| channel.basicConsume(qName, false, consumer) | |
| while (!isStopped) { | |
| val delivery = consumer.nextDelivery() | |
| val body = delivery.getBody() | |
| val msg = ScalaMessagePack.read[RadioMessage](body) | |
| store(body) | |
| channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false) | |
| } | |
| }catch{ | |
| case t: Throwable => | |
| restart("[RmqReceiver] Unhandled error in RabbitMQ Handler", t) | |
| } | |
| } | |
| def onStop(): Unit ={ | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment