Last active
May 9, 2022 18:33
-
-
Save ottomata/26e35c39ff4025d73238fba87c4e8e68 to your computer and use it in GitHub Desktop.
Event Platform Spark Structured Streaming DataFarmeA
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example pyspark code to integrate with Wikimedia Event Platform to automate | |
getting a Spark Structured Streaming DataFrame using event streams and event JSONSchemas. | |
See also: https://wikitech.wikimedia.org/wiki/Event_Platform | |
You'll need the following jar dependencies: | |
- Kafka Client: | |
From: https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/1.1.0 | |
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/1.1.0/kafka-clients-1.1.0.jar | |
- Spark SQL Kafka: | |
From: https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.4.4 | |
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.4/spark-sql-kafka-0-10_2.11-2.4.4.jar | |
- Wikimedia Event Utilties (for Event Platform integration) | |
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/ | |
Wikimedia Analytics Refinery (for Spark Schema integration) | |
Wikimedia Event Utilties is used by refinery-job, so it is included in the fat jar refinery-job-shaded-jar. | |
This is deployed to all stat boxes at /srv/deployment/analytics/refinery/artifacts/refinery-job-shaded.jar | |
To start your spark session, pass these jars in with the --jars CLI opt to either pyspark2 or spark-submit2. | |
E.g. | |
pyspark2 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job-shaded.jar,$HOME/kafka-clients-1.1.0.jar,$HOME/spark-sql-kafka-0-10_2.11-2.4.4.jar | |
""" | |
from pyspark.sql.functions import * | |
from pyspark.sql.types import _parse_datatype_json_string | |
default_kafka_brokers = [ | |
'kafka-jumbo1001.eqiad.wmnet:9092', | |
'kafka-jumbo1002.eqiad.wmnet:9092', | |
'kafka-jumbo1003.eqiad.wmnet:9092', | |
'kafka-jumbo1004.eqiad.wmnet:9092', | |
'kafka-jumbo1005.eqiad.wmnet:9092', | |
'kafka-jumbo1006.eqiad.wmnet:9092', | |
'kafka-jumbo1007.eqiad.wmnet:9092', | |
'kafka-jumbo1008.eqiad.wmnet:9092', | |
'kafka-jumbo1009.eqiad.wmnet:9092', | |
] | |
def eventstream_names(): | |
""" | |
Returns a list of all available streams declared in Event Platform Stream Configuration. | |
https://wikitech.wikimedia.org/wiki/Event_Platform/Stream_Configuration | |
""" | |
event_stream_config_j = sc._jvm.org.wikimedia.eventutilities.core.event.WikimediaDefaults.EVENT_STREAM_CONFIG | |
return event_stream_config_j.cachedStreamNames() | |
def eventstream_topics(stream_name): | |
""" | |
Gets the topics that compose the Event Platform stream. | |
:param stream_name: | |
Name of Event Platform stream. | |
""" | |
sc = spark.sparkContext | |
event_stream_factory_j = sc._jvm.org.wikimedia.eventutilities.core.event.WikimediaDefaults.EVENT_STREAM_FACTORY | |
eventstream_j = event_stream_factory_j.createEventStream(stream_name) | |
return eventstream_j.topics() | |
def eventstream_spark_schema(spark, stream_name): | |
""" | |
Looks up the event JSONSchema for the Event Platform stream_name | |
and converts it to a Spark StructType schema. | |
:param spark: | |
SparkSession | |
:param stream_name: | |
Name of Event Platform stream. | |
""" | |
sc = spark.sparkContext | |
event_stream_factory_j = sc._jvm.org.wikimedia.eventutilities.core.event.WikimediaDefaults.EVENT_STREAM_FACTORY | |
eventstream_j = event_stream_factory_j.createEventStream(stream_name) | |
# Convert the jsonschema to a spark schema | |
jsonschema_j = eventstream_j.schema() | |
SparkJsonSchemaConverter = sc._jvm.org.wikimedia.analytics.refinery.spark.sql.JsonSchemaConverter | |
spark_schema_j = SparkJsonSchemaConverter.toSparkSchema(jsonschema_j) | |
# Convert the Java Spark StructType to the Python version | |
spark_schema = _parse_datatype_json_string(spark_schema_j.json()) | |
return spark_schema | |
def kafka_stream_dataframe(topics, kafka_brokers=default_kafka_brokers): | |
""" | |
Returns a simple string value kafka streaming DataFrame. | |
:param topics: | |
List of Kafka topics | |
:param kafka_brokers: | |
List of Kafka brokers for kafka.bootstrap.servers. | |
""" | |
# Load a basic kafka string stream | |
return spark \ | |
.readStream.format("kafka") \ | |
.option('kafka.bootstrap.servers', ','.join(kafka_brokers)) \ | |
.option('subscribe', ','.join(topics)) \ | |
.load() | |
def eventstream_dataframe(spark, stream_name, kafka_brokers=default_kafka_brokers): | |
""" | |
Integration with Wikimedia Event Platform streams and schemas | |
to automate getting a Spark Structured Streaming DataFrame. | |
Example: | |
revision_create_stream = eventstream_dataframe(spark, 'mediawiki.revision-create') | |
:param spark: | |
SparkSession | |
:param stream_name: | |
Name of Event Platform stream. | |
:param kafka_brokers: | |
List of Kafka brokers for kafka.bootstrap.servers. | |
: | |
""" | |
sc = spark.sparkContext | |
event_stream_factory_j = sc._jvm.org.wikimedia.eventutilities.core.event.WikimediaDefaults.EVENT_STREAM_FACTORY | |
eventstream_j = event_stream_factory_j.createEventStream(stream_name) | |
# Get the Kafka topics we need to subscribe to: | |
topics = eventstream_topics(stream_name) | |
# Get the Kafka dataframe stream; values here are just JSON strings. | |
kafka_json_string_stream = kafka_stream_dataframe(topics, kafka_brokers) | |
# Get the Spark schema of the event stream | |
spark_schema = eventstream_spark_schema(spark, stream_name) | |
# Convert the json strings structed streaming dataframe using the sparkschema | |
# The JSON data is in the Kafka message value. | |
# We need to read it as json (using from_json function) using our schema, | |
# and then select the subfields into a top level DataFrame | |
return kafka_json_string_stream \ | |
.selectExpr("CAST(value AS STRING)") \ | |
.select(from_json("value", spark_schema).alias("data")).select("data.*") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment