Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active May 9, 2022 18:33
Show Gist options
  • Save ottomata/26e35c39ff4025d73238fba87c4e8e68 to your computer and use it in GitHub Desktop.
Save ottomata/26e35c39ff4025d73238fba87c4e8e68 to your computer and use it in GitHub Desktop.
Event Platform Spark Structured Streaming DataFarmeA
"""
Example pyspark code to integrate with Wikimedia Event Platform to automate
getting a Spark Structured Streaming DataFrame using event streams and event JSONSchemas.
See also: https://wikitech.wikimedia.org/wiki/Event_Platform
You'll need the following jar dependencies:
- Kafka Client:
From: https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/1.1.0
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/1.1.0/kafka-clients-1.1.0.jar
- Spark SQL Kafka:
From: https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.4.4
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.4/spark-sql-kafka-0-10_2.11-2.4.4.jar
- Wikimedia Event Utilties (for Event Platform integration)
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/
Wikimedia Analytics Refinery (for Spark Schema integration)
Wikimedia Event Utilties is used by refinery-job, so it is included in the fat jar refinery-job-shaded-jar.
This is deployed to all stat boxes at /srv/deployment/analytics/refinery/artifacts/refinery-job-shaded.jar
To start your spark session, pass these jars in with the --jars CLI opt to either pyspark2 or spark-submit2.
E.g.
pyspark2 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job-shaded.jar,$HOME/kafka-clients-1.1.0.jar,$HOME/spark-sql-kafka-0-10_2.11-2.4.4.jar
"""
from pyspark.sql.functions import *
from pyspark.sql.types import _parse_datatype_json_string
default_kafka_brokers = [
'kafka-jumbo1001.eqiad.wmnet:9092',
'kafka-jumbo1002.eqiad.wmnet:9092',
'kafka-jumbo1003.eqiad.wmnet:9092',
'kafka-jumbo1004.eqiad.wmnet:9092',
'kafka-jumbo1005.eqiad.wmnet:9092',
'kafka-jumbo1006.eqiad.wmnet:9092',
'kafka-jumbo1007.eqiad.wmnet:9092',
'kafka-jumbo1008.eqiad.wmnet:9092',
'kafka-jumbo1009.eqiad.wmnet:9092',
]
def eventstream_names():
"""
Returns a list of all available streams declared in Event Platform Stream Configuration.
https://wikitech.wikimedia.org/wiki/Event_Platform/Stream_Configuration
"""
event_stream_config_j = sc._jvm.org.wikimedia.eventutilities.core.event.WikimediaDefaults.EVENT_STREAM_CONFIG
return event_stream_config_j.cachedStreamNames()
def eventstream_topics(stream_name):
"""
Gets the topics that compose the Event Platform stream.
:param stream_name:
Name of Event Platform stream.
"""
sc = spark.sparkContext
event_stream_factory_j = sc._jvm.org.wikimedia.eventutilities.core.event.WikimediaDefaults.EVENT_STREAM_FACTORY
eventstream_j = event_stream_factory_j.createEventStream(stream_name)
return eventstream_j.topics()
def eventstream_spark_schema(spark, stream_name):
"""
Looks up the event JSONSchema for the Event Platform stream_name
and converts it to a Spark StructType schema.
:param spark:
SparkSession
:param stream_name:
Name of Event Platform stream.
"""
sc = spark.sparkContext
event_stream_factory_j = sc._jvm.org.wikimedia.eventutilities.core.event.WikimediaDefaults.EVENT_STREAM_FACTORY
eventstream_j = event_stream_factory_j.createEventStream(stream_name)
# Convert the jsonschema to a spark schema
jsonschema_j = eventstream_j.schema()
SparkJsonSchemaConverter = sc._jvm.org.wikimedia.analytics.refinery.spark.sql.JsonSchemaConverter
spark_schema_j = SparkJsonSchemaConverter.toSparkSchema(jsonschema_j)
# Convert the Java Spark StructType to the Python version
spark_schema = _parse_datatype_json_string(spark_schema_j.json())
return spark_schema
def kafka_stream_dataframe(topics, kafka_brokers=default_kafka_brokers):
"""
Returns a simple string value kafka streaming DataFrame.
:param topics:
List of Kafka topics
:param kafka_brokers:
List of Kafka brokers for kafka.bootstrap.servers.
"""
# Load a basic kafka string stream
return spark \
.readStream.format("kafka") \
.option('kafka.bootstrap.servers', ','.join(kafka_brokers)) \
.option('subscribe', ','.join(topics)) \
.load()
def eventstream_dataframe(spark, stream_name, kafka_brokers=default_kafka_brokers):
"""
Integration with Wikimedia Event Platform streams and schemas
to automate getting a Spark Structured Streaming DataFrame.
Example:
revision_create_stream = eventstream_dataframe(spark, 'mediawiki.revision-create')
:param spark:
SparkSession
:param stream_name:
Name of Event Platform stream.
:param kafka_brokers:
List of Kafka brokers for kafka.bootstrap.servers.
:
"""
sc = spark.sparkContext
event_stream_factory_j = sc._jvm.org.wikimedia.eventutilities.core.event.WikimediaDefaults.EVENT_STREAM_FACTORY
eventstream_j = event_stream_factory_j.createEventStream(stream_name)
# Get the Kafka topics we need to subscribe to:
topics = eventstream_topics(stream_name)
# Get the Kafka dataframe stream; values here are just JSON strings.
kafka_json_string_stream = kafka_stream_dataframe(topics, kafka_brokers)
# Get the Spark schema of the event stream
spark_schema = eventstream_spark_schema(spark, stream_name)
# Convert the json strings structed streaming dataframe using the sparkschema
# The JSON data is in the Kafka message value.
# We need to read it as json (using from_json function) using our schema,
# and then select the subfields into a top level DataFrame
return kafka_json_string_stream \
.selectExpr("CAST(value AS STRING)") \
.select(from_json("value", spark_schema).alias("data")).select("data.*")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment