Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active September 14, 2018 18:56
Show Gist options
  • Save ottomata/0e21ad6f565192dd7afe50ac7afbeffd to your computer and use it in GitHub Desktop.
Save ottomata/0e21ad6f565192dd7afe50ac7afbeffd to your computer and use it in GitHub Desktop.
// spark2-shell --jars /home/otto/kafka-clients-1.1.1.jar,/home/otto/spark-sql-kafka-0-10_2.11-2.3.1.jar
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
// Subscribe to eventlogging-valid-mixed using Spark structured streaming
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet:9092,kafka-jumbo1002.eqiad.wmnet:9092")
.option("subscribe", "codfw.mediawiki.revision-create")
.load()
// We need a schema to convert the loosely typed json into a strongly typed
// DataFrame. This is a spark schema. In the future this should
// be automated from the JSONSchema at
// https://github.com/wikimedia/mediawiki-event-schemas/blob/master/jsonschema/mediawiki/revision/create/3.yaml
val sparkSchema = StructType(Seq(
StructField("meta",
StructType(Seq(
StructField("topic", StringType, true),
StructField("schema_uri", StringType, true),
StructField("uri", StringType, true),
StructField("request_id", StringType, true),
StructField("id", StringType, true),
StructField("dt", StringType, true),
StructField("domain", StringType, true)
)
), true),
StructField("database", StringType, true),
StructField("performer", StructType(Seq(
StructField("user_id", IntegerType, true),
StructField("user_text", StringType, true)
)), true),
StructField("comment", StringType, true),
StructField("parsedcomment", StringType, true),
StructField("page_id", IntegerType, true),
StructField("page_title", StringType, true),
StructField("page_namespace", IntegerType, true),
StructField("rev_id", IntegerType, true),
StructField("rev_parent_id", IntegerType, true),
StructField("rev_timestamp", StringType, true),
StructField("rev_sha1", StringType, true)
))
// Kafka messages are key, value. Cast value into a string, then read it as
// json, then apply the sparkSchema, and select out of
// the value all the fields.
val events = stream.select(from_json($"value".cast("string"), sparkSchema).alias("value")).select("value.*")
// Count by domain
val revision_creates_per_domain_stream = events.where("page_namespace = 0").groupBy("meta.domain").count()
// Start a streaming query that saves its results in an in memory table
// named 'page_views'. 'domain_views' can now be queried for up-to-date
// results at any time.
val streamingQuery = revision_creates_per_domain_stream
.writeStream
.queryName("domain_counts")
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from domain_counts order by count desc").show()
// We can add a StreamingQueryListener
// callback that will be executed every time the micro batch completes.
val streamListener = new StreamingQueryListener() {
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
//println("Query made progress: " + queryProgress.progress)
// Query the in memory table for the current schema counts
val currentDf = spark.sql("select * from domain_counts order by count desc")
// group them together into a single spark partition (this should be small anyway)
// and overwrite them into a hive table.
// currentDf.repartition(1).write.mode("overwrite").saveAsTable("otto.revision_create_domain_counts")
// also print out the stop results because why not! It's fun.
currentDf.show()
}
// We don't want to do anything with start or termination,
// but we have to override them anyway'
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { }
}
// add the new listener callback
spark.streams.addListener(streamListener)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment