Last active
September 14, 2018 18:56
-
-
Save ottomata/0e21ad6f565192dd7afe50ac7afbeffd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// spark2-shell --jars /home/otto/kafka-clients-1.1.1.jar,/home/otto/spark-sql-kafka-0-10_2.11-2.3.1.jar | |
import org.apache.spark.sql.streaming._ | |
import org.apache.spark.sql.streaming.StreamingQueryListener._ | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.functions.from_json | |
// Subscribe to eventlogging-valid-mixed using Spark structured streaming | |
val stream = spark | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet:9092,kafka-jumbo1002.eqiad.wmnet:9092") | |
.option("subscribe", "codfw.mediawiki.revision-create") | |
.load() | |
// We need a schema to convert the loosely typed json into a strongly typed | |
// DataFrame. This is a spark schema. In the future this should | |
// be automated from the JSONSchema at | |
// https://github.com/wikimedia/mediawiki-event-schemas/blob/master/jsonschema/mediawiki/revision/create/3.yaml | |
val sparkSchema = StructType(Seq( | |
StructField("meta", | |
StructType(Seq( | |
StructField("topic", StringType, true), | |
StructField("schema_uri", StringType, true), | |
StructField("uri", StringType, true), | |
StructField("request_id", StringType, true), | |
StructField("id", StringType, true), | |
StructField("dt", StringType, true), | |
StructField("domain", StringType, true) | |
) | |
), true), | |
StructField("database", StringType, true), | |
StructField("performer", StructType(Seq( | |
StructField("user_id", IntegerType, true), | |
StructField("user_text", StringType, true) | |
)), true), | |
StructField("comment", StringType, true), | |
StructField("parsedcomment", StringType, true), | |
StructField("page_id", IntegerType, true), | |
StructField("page_title", StringType, true), | |
StructField("page_namespace", IntegerType, true), | |
StructField("rev_id", IntegerType, true), | |
StructField("rev_parent_id", IntegerType, true), | |
StructField("rev_timestamp", StringType, true), | |
StructField("rev_sha1", StringType, true) | |
)) | |
// Kafka messages are key, value. Cast value into a string, then read it as | |
// json, then apply the sparkSchema, and select out of | |
// the value all the fields. | |
val events = stream.select(from_json($"value".cast("string"), sparkSchema).alias("value")).select("value.*") | |
// Count by domain | |
val revision_creates_per_domain_stream = events.where("page_namespace = 0").groupBy("meta.domain").count() | |
// Start a streaming query that saves its results in an in memory table | |
// named 'page_views'. 'domain_views' can now be queried for up-to-date | |
// results at any time. | |
val streamingQuery = revision_creates_per_domain_stream | |
.writeStream | |
.queryName("domain_counts") | |
.outputMode("complete") | |
.format("memory") | |
.start() | |
spark.sql("select * from domain_counts order by count desc").show() | |
// We can add a StreamingQueryListener | |
// callback that will be executed every time the micro batch completes. | |
val streamListener = new StreamingQueryListener() { | |
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { | |
//println("Query made progress: " + queryProgress.progress) | |
// Query the in memory table for the current schema counts | |
val currentDf = spark.sql("select * from domain_counts order by count desc") | |
// group them together into a single spark partition (this should be small anyway) | |
// and overwrite them into a hive table. | |
// currentDf.repartition(1).write.mode("overwrite").saveAsTable("otto.revision_create_domain_counts") | |
// also print out the stop results because why not! It's fun. | |
currentDf.show() | |
} | |
// We don't want to do anything with start or termination, | |
// but we have to override them anyway' | |
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } | |
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { } | |
} | |
// add the new listener callback | |
spark.streams.addListener(streamListener) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment