-
-
Save OElesin/d75cb128b3396b89449b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package botkop.sparti.receiver | |
import com.rabbitmq.client._ | |
import org.apache.spark.Logging | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.streaming.StreamingContext | |
import org.apache.spark.streaming.dstream.ReceiverInputDStream | |
import org.apache.spark.streaming.receiver.Receiver | |
import scala.reflect.ClassTag | |
class RabbitInputDStream[T: ClassTag]( | |
ssc_ : StreamingContext, | |
uri: String, | |
exchangeName: String, | |
queueName: String, | |
routingKey: String, | |
converter: Array[Byte] => T, | |
storageLevel: StorageLevel | |
) extends ReceiverInputDStream[T](ssc_) { | |
def getReceiver(): Receiver[T] = { | |
new RabbitReceiver(uri, exchangeName, queueName, routingKey, converter, storageLevel) | |
} | |
} | |
class RabbitReceiver[T: ClassTag](uri: String, | |
exchangeName: String, | |
queueName: String, | |
routingKey: String, | |
bytesToObject: Array[Byte] => T, | |
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2 ) | |
extends Receiver[T](storageLevel) with Logging { | |
def onStart() = { | |
new Thread("Rabbit Receiver") { | |
setDaemon(true) | |
override def run() { receive() } | |
}.start() | |
} | |
def onStop() = { | |
// There is nothing much to do as the thread calling receive() | |
// is designed to stop by itself isStopped() returns false | |
} | |
private def receive() = { | |
var connection: Connection = null | |
try { | |
connection = getConnection | |
val channel = connection.createChannel | |
// channel.exchangeDeclare(exchangeName, "direct", true) // assuming exchange has been declared | |
channel.queueDeclare(queueName, true, false, false, null) | |
channel.queueBind(queueName, exchangeName, routingKey) | |
val autoAck = false | |
val consumer = new MessageConsumer(channel, convertAndStore) | |
channel.basicConsume(queueName, autoAck, getClass.getCanonicalName, consumer) | |
} catch { | |
case t: Throwable => | |
if (connection != null) { | |
try { | |
connection.close() | |
} catch { | |
case u: Throwable => logError("error trying to close connection to rabbit", u) | |
} | |
} | |
restart("error receiving data from rabbit", t) | |
} | |
} | |
def convertAndStore(body: Array[Byte]) = { | |
val o: T = bytesToObject(body) | |
store(o) | |
} | |
private def getConnection: Connection = { | |
val factory = new ConnectionFactory | |
factory.setUri(uri) | |
factory.setAutomaticRecoveryEnabled(true) //enable automatic connection recovery | |
factory.newConnection | |
} | |
} | |
class MessageConsumer(channel: Channel, store: (Array[Byte]) => Unit) extends DefaultConsumer(channel) { | |
override def handleDelivery( | |
consumerTag: String, | |
envelope: Envelope, | |
properties: AMQP.BasicProperties, | |
body: Array[Byte] ) = { | |
val deliveryTag = envelope.getDeliveryTag | |
// process the message | |
store(body) | |
channel.basicAck(deliveryTag, false) | |
} | |
} | |
object RabbitReceiver { | |
def rabbitTextStream( | |
ssc : StreamingContext, exchangeName: String, queueName: String, | |
routingKey: String = "#", | |
uri: String = "amqp://guest:guest@localhost:5672/%2f", | |
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ) | |
: RabbitInputDStream[String] = | |
new RabbitInputDStream[String](ssc, uri, exchangeName, queueName, routingKey, new String(_), storageLevel) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment