Created
October 15, 2018 06:52
-
-
Save HeartSaVioR/6afe09818f9b24ec19341c8d44abf98a to your computer and use it in GitHub Desktop.
Simple Spark DStream with Kafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.heartsavior.spark.trial | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.streaming.kafka010.KafkaUtils | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import org.apache.spark.streaming.kafka010._ | |
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent | |
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe | |
object SparkDStreamTrial { | |
val checkpointDirectory = "./spark-dstream-trial" | |
// Function to create and setup a new StreamingContext | |
def functionToCreateContext(conf: SparkConf): StreamingContext = { | |
val ssc = new StreamingContext(conf, Seconds(10)) | |
val kafkaParams = Map[String, Object]( | |
"bootstrap.servers" -> "localhost:9092", | |
"key.deserializer" -> classOf[StringDeserializer], | |
"value.deserializer" -> classOf[StringDeserializer], | |
"group.id" -> "spark-dstream-trial", | |
"auto.offset.reset" -> "latest", | |
"enable.auto.commit" -> (false: java.lang.Boolean) | |
) | |
ssc.checkpoint(checkpointDirectory) // set checkpoint directory | |
conf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") | |
val topics = Array("truck_speed_events_stream") | |
val stream = KafkaUtils.createDirectStream[String, String]( | |
ssc, | |
PreferConsistent, | |
Subscribe[String, String](topics, kafkaParams) | |
) | |
stream.map(_.value()).foreachRDD { rdd => | |
// Get the singleton instance of SparkSession | |
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() | |
import spark.implicits._ | |
// Convert RDD[String] to DataFrame | |
val dataFrame = spark.sqlContext.read.json(rdd) | |
dataFrame.show() | |
} | |
ssc | |
} | |
def main(args: Array[String]): Unit = { | |
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkDStreamTrial") | |
// Get StreamingContext from checkpoint data or create a new one | |
val ssc = StreamingContext.getOrCreate(checkpointDirectory, | |
() => functionToCreateContext(conf)) | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment