Skip to content

Instantly share code, notes, and snippets.

@ashic
Created January 22, 2015 11:26
Show Gist options
  • Select an option

  • Save ashic/b5edc7cfdc85aa60b066 to your computer and use it in GitHub Desktop.

Select an option

Save ashic/b5edc7cfdc85aa60b066 to your computer and use it in GitHub Desktop.
Rabbitmq receiver
class RmqReceiver(uname:String,
passwd: String,
vHost:String,
host:String,
qName:String,
topic:Option[String])
extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
private def receive(): Unit ={
val factory = new ConnectionFactory()
factory.setUsername(uname)
factory.setPassword(passwd)
factory.setVirtualHost(vHost)
factory.setHost(host)
try {
val connection = factory.newConnection()
val channel = connection.createChannel()
val consumer = new QueueingConsumer(channel)
channel.basicConsume(qName, false, consumer)
while (!isStopped) {
val delivery = consumer.nextDelivery()
val body = delivery.getBody()
val msg = ScalaMessagePack.read[RadioMessage](body)
store(body)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
}
}catch{
case t: Throwable =>
restart("[RmqReceiver] Unhandled error in RabbitMQ Handler", t)
}
}
def onStop(): Unit ={
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment