Skip to content

Instantly share code, notes, and snippets.

@alonsoir
Last active June 12, 2018 14:50
Show Gist options
  • Save alonsoir/b8ba8d6c6a45b04e9a675286d616cb22 to your computer and use it in GitHub Desktop.
Save alonsoir/b8ba8d6c6a45b04e9a675286d616cb22 to your computer and use it in GitHub Desktop.
spark-structured-streaming 2.3.0 with kafka
https://github.com/apache/spark/tree/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming
# be sure that topics already exists!
$:~ aironman$ kafka-topics --zookeeper localhost:2181 --list
MJPpA
Obq6c
__consumer_offsets
ad-events
aironman
amazonRatingsTopic
consumer-tutorial
filtered
greeting
mali
my-topic
new-aironman
okQl2
partitioned
test
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
# commands without scala> just copy and paste ...
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
import java.util.UUID
val linesFromAironmanTopic = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","aironman").load().selectExpr("CAST (value AS STRING)").as[String]
linesFromAironmanTopic.printSchema
val wordCountFromAironmanTopic = linesFromAironmanTopic.flatMap(_.split(" ")).groupBy("value").count()
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID
wordCountFromAironmanTopic.writeStream.outputMode("complete").format("console").option("checkpointLocation",checkpointLocation).start().awaitTermination()
val host = "localhost"
val port = "8888"
val linesFromHostAndPort = spark.readStream.format("socket").option("host", host).option("port", port).load()
val wordCountFromHostAndPort = linesFromHostAndPort.as[String].flatMap(_.split(" ")).groupBy("value").count()
# run netcat server with this command,
# nc -lk 8888
# and type something
wordCountFromHostAndPort.writeStream.outputMode("complete").format("console").start().awaitTermination()
import java.sql.Timestamp
import org.apache.spark.sql.functions._
val windowSize = 10
val slideSize = 5
val windowDuration = s"$windowSize seconds"
val slideDuration = s"$slideSize seconds"
import spark.implicits._
# Create DataFrame representing the stream of input lines from connection to host:port
val linesWithTimestamp = spark.readStream.format("socket").option("host", host).option("port", port).option("includeTimestamp", true).load()
# Split the lines into words, retaining timestamps
val wordsWithTimestamp = linesWithTimestamp.as[(String, Timestamp)].flatMap(line =>line._1.split(" ").map(word => (word, line._2))).toDF("word", "timestamp")
# Group the data by window and word and compute the count of each group
val windowedCounts = wordsWithTimestamp.groupBy(window($"timestamp", windowDuration, slideDuration), $"word").count().orderBy("window")
# Start running the query that prints the windowed word counts to the console
windowedCounts.writeStream.outputMode("complete").format("console").option("truncate", "false").start().awaitTermination()
import org.apache.spark.sql.streaming._
# Event class not found! where the fuck it is?
val events = linesWithTimestamp.as[(String, Timestamp)].flatMap { case (line, timestamp) =>line.split(" ").map(word => Event(sessionId = word, timestamp))}
# Sessionize the events. Track number of events, start and end timestamps of session, and
# and report session updates.
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
// If timed out, then remove session and send final update
if (state.hasTimedOut) {
val finalUpdate =
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
state.remove()
finalUpdate
} else {
// Update start and end timestamps in session
val timestamps = events.map(_.timestamp.getTime).toSeq
val updatedSession = if (state.exists) {
val oldSession = state.get
SessionInfo(
oldSession.numEvents + timestamps.size,
oldSession.startTimestampMs,
math.max(oldSession.endTimestampMs, timestamps.max))
} else {
SessionInfo(timestamps.size, timestamps.min, timestamps.max)
}
state.update(updatedSession)
// Set timeout such that the session will be expired if no data received for 10 seconds
state.setTimeoutDuration("10 seconds")
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment