Last active
June 12, 2018 14:50
-
-
Save alonsoir/b8ba8d6c6a45b04e9a675286d616cb22 to your computer and use it in GitHub Desktop.
spark-structured-streaming 2.3.0 with kafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
https://github.com/apache/spark/tree/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# be sure that topics already exists! | |
$:~ aironman$ kafka-topics --zookeeper localhost:2181 --list | |
MJPpA | |
Obq6c | |
__consumer_offsets | |
ad-events | |
aironman | |
amazonRatingsTopic | |
consumer-tutorial | |
filtered | |
greeting | |
mali | |
my-topic | |
new-aironman | |
okQl2 | |
partitioned | |
test | |
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 | |
# commands without scala> just copy and paste ... | |
import org.apache.spark.sql.streaming.Trigger | |
import spark.implicits._ | |
import java.util.UUID | |
val linesFromAironmanTopic = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","aironman").load().selectExpr("CAST (value AS STRING)").as[String] | |
linesFromAironmanTopic.printSchema | |
val wordCountFromAironmanTopic = linesFromAironmanTopic.flatMap(_.split(" ")).groupBy("value").count() | |
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID | |
wordCountFromAironmanTopic.writeStream.outputMode("complete").format("console").option("checkpointLocation",checkpointLocation).start().awaitTermination() | |
val host = "localhost" | |
val port = "8888" | |
val linesFromHostAndPort = spark.readStream.format("socket").option("host", host).option("port", port).load() | |
val wordCountFromHostAndPort = linesFromHostAndPort.as[String].flatMap(_.split(" ")).groupBy("value").count() | |
# run netcat server with this command, | |
# nc -lk 8888 | |
# and type something | |
wordCountFromHostAndPort.writeStream.outputMode("complete").format("console").start().awaitTermination() | |
import java.sql.Timestamp | |
import org.apache.spark.sql.functions._ | |
val windowSize = 10 | |
val slideSize = 5 | |
val windowDuration = s"$windowSize seconds" | |
val slideDuration = s"$slideSize seconds" | |
import spark.implicits._ | |
# Create DataFrame representing the stream of input lines from connection to host:port | |
val linesWithTimestamp = spark.readStream.format("socket").option("host", host).option("port", port).option("includeTimestamp", true).load() | |
# Split the lines into words, retaining timestamps | |
val wordsWithTimestamp = linesWithTimestamp.as[(String, Timestamp)].flatMap(line =>line._1.split(" ").map(word => (word, line._2))).toDF("word", "timestamp") | |
# Group the data by window and word and compute the count of each group | |
val windowedCounts = wordsWithTimestamp.groupBy(window($"timestamp", windowDuration, slideDuration), $"word").count().orderBy("window") | |
# Start running the query that prints the windowed word counts to the console | |
windowedCounts.writeStream.outputMode("complete").format("console").option("truncate", "false").start().awaitTermination() | |
import org.apache.spark.sql.streaming._ | |
# Event class not found! where the fuck it is? | |
val events = linesWithTimestamp.as[(String, Timestamp)].flatMap { case (line, timestamp) =>line.split(" ").map(word => Event(sessionId = word, timestamp))} | |
# Sessionize the events. Track number of events, start and end timestamps of session, and | |
# and report session updates. | |
val sessionUpdates = events | |
.groupByKey(event => event.sessionId) | |
.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) { | |
case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) => | |
// If timed out, then remove session and send final update | |
if (state.hasTimedOut) { | |
val finalUpdate = | |
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true) | |
state.remove() | |
finalUpdate | |
} else { | |
// Update start and end timestamps in session | |
val timestamps = events.map(_.timestamp.getTime).toSeq | |
val updatedSession = if (state.exists) { | |
val oldSession = state.get | |
SessionInfo( | |
oldSession.numEvents + timestamps.size, | |
oldSession.startTimestampMs, | |
math.max(oldSession.endTimestampMs, timestamps.max)) | |
} else { | |
SessionInfo(timestamps.size, timestamps.min, timestamps.max) | |
} | |
state.update(updatedSession) | |
// Set timeout such that the session will be expired if no data received for 10 seconds | |
state.setTimeoutDuration("10 seconds") | |
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment