Skip to content

Instantly share code, notes, and snippets.

@randerzander
Created April 23, 2016 18:00
Show Gist options
  • Save randerzander/3b9088cb768e7ef2c689a018870f3ff7 to your computer and use it in GitHub Desktop.
Save randerzander/3b9088cb768e7ef2c689a018870f3ff7 to your computer and use it in GitHub Desktop.
Reading from NiFi into Spark Streaming
import org.apache.nifi.remote.client.SiteToSiteClientConfig
import org.apache.nifi.remote.client.SiteToSiteClient
import org.apache.nifi.spark.NiFiReceiver
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(2))
val wifiConfig = new SiteToSiteClient.Builder()
.url("http://raspberrypi:8080/nifi")
.portName("wifi")
.buildConfig();
val tickConfig = new SiteToSiteClient.Builder()
.url("http://raspberrypi:8080/nifi")
.portName("tick")
.buildConfig();
val wifiStream = ssc.receiverStream(new NiFiReceiver(wifiConfig, org.apache.spark.storage.StorageLevel.MEMORY_ONLY))
val tickStream = ssc.receiverStream(new NiFiReceiver(tickConfig, org.apache.spark.storage.StorageLevel.MEMORY_ONLY))
wifiStream.count()
.union(
tickStream.count()
)
.print()
@atanu123
Copy link

I want to write similar kind of code in Pyspark, I am not getting what python packages/modules I need to import in my code. Can any one give me some idea ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment