Skip to content

Instantly share code, notes, and snippets.

@WadeWaldron
Created March 18, 2016 19:05
Show Gist options
  • Save WadeWaldron/92f257f02438e5895910 to your computer and use it in GitHub Desktop.
Save WadeWaldron/92f257f02438e5895910 to your computer and use it in GitHub Desktop.
KafkaConsumers-Spark
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
val checkpointDir = "/tmp/sparkcheckpoints"
val sparkConfig = new SparkConf().setAppName("SparkDriver").setMaster("local[2]")
val sparkContext = new SparkContext(sparkConfig)
val kafkaConfig = Map("metadata.broker.list" -> "localhost:9092")
def createStreamingContext() = {
val ctx = new StreamingContext(sparkContext, Seconds(10))
ctx.checkpoint(checkpointDir)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ctx,
kafkaConfig,
Set("messages")
)
stream.foreachRDD { rdd =>
rdd.foreach { evt =>
println(evt._2)
}
}
ctx
}
val streamingContext = StreamingContext.getOrCreate(checkpointDir, createStreamingContext)
streamingContext.start()
streamingContext.awaitTerminationOrTimeout(30000)
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment