Skip to content

Instantly share code, notes, and snippets.

@vaquarkhan
Forked from calvinlfer/ExampleApp.scala
Created August 17, 2022 16:28
Show Gist options
  • Select an option

  • Save vaquarkhan/76d6a68e495831536f12863affc1aec7 to your computer and use it in GitHub Desktop.

Select an option

Save vaquarkhan/76d6a68e495831536f12863affc1aec7 to your computer and use it in GitHub Desktop.
Spark Direct Streaming Kafka
version: '3.1'
services:
fast-data:
image: landoop/fast-data-dev:latest
ports:
- "9092:9092" # Kafka broker
- "8081:8081" # Schema Registry
- "8082:8082" # Kafka REST Proxy
- "8083:8083" # Kafka Connect
- "2182:2181" # Zookeeper
- "3030:3030" # Landoop Development Environment UI
environment:
- "ADV_HOST=127.0.0.1" # Allow services in container to be remotely accessible (so the host can access it)
- "SAMPLEDATA=0" # Do not create example topics
- "FORWARDLOGS=0" # Disable the 5 file source connectors that automatically bring logs into Kafka topics
- "RUNTESTS=0" # Don't run Coyote integration tests on container startup (enable if debugging)
networks:
- fast-data
networks:
fast-data:
package com.experiments.calvin
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ExampleApp extends App {
val spark: SparkSession = SparkSession.builder().master("local[1]").appName("Example").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
val batchInterval = Seconds(1)
val streamingContext = new StreamingContext(spark.sparkContext, batchInterval)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "calvin_consumer_group_1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("example")
val kafkaConsumer = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
kafkaConsumer.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val localData: Array[(String, String)] = rdd.map(cr => (cr.key(), cr.value())).collect()
println("Obtained data:")
localData.foreach(println)
kafkaConsumer.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
kafkaConsumer.start()
streamingContext.start()
streamingContext.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment