Skip to content

Instantly share code, notes, and snippets.

@fancellu
Created January 23, 2017 17:26
Show Gist options
  • Save fancellu/3652b5abe148e3a9f27be06218750980 to your computer and use it in GitHub Desktop.
Save fancellu/3652b5abe148e3a9f27be06218750980 to your computer and use it in GitHub Desktop.
Spark streaming custom receiver example, consumes scala Streams
/**
* Allows you to receive scala.collection.immutable.Stream
*/
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class InfiniteStreamReceiver[T](stream: Stream[T], delay:Int=0, storageLevel: StorageLevel) extends Receiver[T](storageLevel) {
receiver=>
override def onStart(): Unit = {
new Thread("InfiniteStreamReceiver"){
override def run(): Unit = {
stream.takeWhile{_=> Thread.sleep(delay);!isStopped}.foreach(store)
}
setDaemon(true)
}.start()
}
override def onStop(): Unit = {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment