Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Created March 23, 2019 17:31
Show Gist options
  • Select an option

  • Save calvinlfer/adae9e50f62b465e65e4e54ab4034fb2 to your computer and use it in GitHub Desktop.

Select an option

Save calvinlfer/adae9e50f62b465e65e4e54ab4034fb2 to your computer and use it in GitHub Desktop.
Spark Direct Streaming Kafka
version: '3.1'
services:
fast-data:
image: landoop/fast-data-dev:latest
ports:
- "9092:9092" # Kafka broker
- "8081:8081" # Schema Registry
- "8082:8082" # Kafka REST Proxy
- "8083:8083" # Kafka Connect
- "2182:2181" # Zookeeper
- "3030:3030" # Landoop Development Environment UI
environment:
- "ADV_HOST=127.0.0.1" # Allow services in container to be remotely accessible (so the host can access it)
- "SAMPLEDATA=0" # Do not create example topics
- "FORWARDLOGS=0" # Disable the 5 file source connectors that automatically bring logs into Kafka topics
- "RUNTESTS=0" # Don't run Coyote integration tests on container startup (enable if debugging)
networks:
- fast-data
networks:
fast-data:
package com.experiments.calvin
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ExampleApp extends App {
val spark: SparkSession = SparkSession.builder().master("local[1]").appName("Example").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
val batchInterval = Seconds(1)
val streamingContext = new StreamingContext(spark.sparkContext, batchInterval)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "calvin_consumer_group_1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("example")
val kafkaConsumer = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
kafkaConsumer.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val localData: Array[(String, String)] = rdd.map(cr => (cr.key(), cr.value())).collect()
println("Obtained data:")
localData.foreach(println)
kafkaConsumer.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
kafkaConsumer.start()
streamingContext.start()
streamingContext.awaitTermination()
}
@calvinlfer
Copy link
Author

name := "spark-experiments"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies ++= {
  val spark = "org.apache.spark"
  val sparkV = "2.4.0"

  val monix = "io.monix"
  val monixV = "3.0.0-RC2"

  val fs2 = "co.fs2"
  val fs2V = "1.0.3"

  Seq(
    spark     %% "spark-core"                 % sparkV,
    spark     %% "spark-sql"                  % sparkV,
    spark     %% "spark-streaming-kafka-0-10" % sparkV,
    spark     %% "spark-streaming"            % sparkV,
    monix     %% "monix"                      % monixV,
    fs2       %% "fs2-core"                   % fs2V
  )
}

Terminating a streaming job if it doesn't receive data for more than 10 seconds

package com.experiments.calvin

import fs2.concurrent.Queue
import monix.eval._
import monix.execution._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.concurrent.duration._

object ExampleAppDirectBatching extends App {
  val spark: SparkSession = SparkSession.builder().master("local[1]").appName("Example").getOrCreate()
  spark.sparkContext.setLogLevel("INFO")
  val batchInterval = Seconds(1)
  val streamingContext = new StreamingContext(spark.sparkContext, batchInterval)

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "calvin_consumer_group_1",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )
  val topics = Array("example")

  val kafkaConsumer = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

  implicit val taskScheduler: Scheduler = Scheduler.global

  final case class DataReceived()

  val queue = Queue.bounded[Task, DataReceived](10).runSyncUnsafe()

  kafkaConsumer.foreachRDD { rdd =>
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    val localData: Array[(String, String)] = rdd.map(cr => (cr.key(), cr.value())).collect()

    if (localData.nonEmpty) {
      queue.offer1(DataReceived()).runSyncUnsafe()
    }

    localData.foreach(println)
    kafkaConsumer.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }

  val beginKafkaConsumer = Task(kafkaConsumer.start())
  val beginStreaming = Task(streamingContext.start())
  val stopStreaming = Task(streamingContext.stop(stopSparkContext = true, stopGracefully = true))
  val waitTermination = Task(streamingContext.awaitTermination())

  def delayUntilNoDataIsReceived[A](queue: Queue[Task, DataReceived], exit: Task[A], delay: FiniteDuration): Task[A] =
    for {
      option  <-  Task.suspend(queue.tryDequeue1).delayExecution(delay)
      res     <-  if (option.isEmpty) exit
                  else delayUntilNoDataIsReceived(queue, exit, delay)
    } yield res

  val program = for {
    _ <- beginKafkaConsumer
    _ <- beginStreaming
    _ <- delayUntilNoDataIsReceived(queue, stopStreaming, 10.seconds)
    _ <- waitTermination
  } yield ()

  program.runSyncUnsafe()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment