Last active
November 3, 2021 13:13
-
-
Save ottomata/c6411c9872e80bce4c4c33ed6bee9b42 to your computer and use it in GitHub Desktop.
Spark Structured Streaming example - word count in JSON field in Kafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// spark2-shell --jars /home/otto/kafka-clients-1.1.1.jar,/home/otto/spark-sql-kafka-0-10_2.11-2.3.1.jar | |
import org.apache.spark.sql.streaming._ | |
import org.apache.spark.sql.streaming.StreamingQueryListener._ | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.functions.from_json | |
// Subscribe to eventlogging-valid-mixed using Spark structured streaming | |
val eventlogging_valid_mixed_stream = spark | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet:9092,kafka-jumbo1002.eqiad.wmnet:9092") | |
.option("subscribe", "eventlogging-valid-mixed") | |
.load() | |
// Since eventlogging-valid-mixed has multiple schemas in it, we can't | |
// really use structured streaming (every message must have a well defined schema). | |
// Instead, we create a custom schema here that includes only the 'schema' field | |
// from EventCapsule. We apply this schema when reading JSON using the from_json | |
// sql function, dropping every field in the data except for 'schema' name. | |
val sparkSchema = StructType(Seq(StructField("schema", StringType, true))) | |
// Kafka messages are key, value. Cast value into a string, then read it as | |
// json, then apply the 'schema' name only sparkSchema, and select out of | |
// the value the 'schema' name field. schemas will be a streaming DataFrame | |
// with a single 'schema' field. | |
val schemas = eventlogging_valid_mixed_stream.select(from_json($"value".cast("string"), sparkSchema).alias("value")).select("value.schema") | |
// Count by schema name. | |
val schemaCounts = schemas.groupBy("schema").count() | |
// Before we start the streaming query, we will add a StreamingQueryListener | |
// callback that will be executed every time the micro batch completes. | |
val streamListener = new StreamingQueryListener() { | |
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { | |
//println("Query made progress: " + queryProgress.progress) | |
// Query the in memory table for the current schema counts | |
val currentDf = spark.sql("select * from schema_counts order by count desc") | |
// group them together into a single spark partition (this should be small anyway) | |
// and overwrite them into a hive table. | |
currentDf.repartition(1).write.mode("overwrite").saveAsTable("otto.eventlogging_valid_mixed_schema_counts") | |
// also print out the stop results because why not! It's fun. | |
currentDf.show() | |
} | |
// We don't want to do anything with start or termination, | |
// but we have to override them anyway' | |
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } | |
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { } | |
} | |
// add the new listener callback | |
spark.streams.addListener(streamListener) | |
// Start a streaming query that saves its results in an in memory table | |
// named 'schema_counts'. 'schema_counts' can now be queried for up-to-date | |
// results at any time. | |
val streamingQuery = schemaCounts | |
.writeStream | |
.queryName("schema_counts") | |
.outputMode("complete") | |
.format("memory") | |
.start() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment