Skip to content

Instantly share code, notes, and snippets.

@oskarryn
Created March 4, 2019 21:01
Show Gist options
  • Save oskarryn/44935b206dad7497fef631c711a5cf87 to your computer and use it in GitHub Desktop.
Save oskarryn/44935b206dad7497fef631c711a5cf87 to your computer and use it in GitHub Desktop.
Util for Spark Structured Streaming. Extract data from kafka message's value and specify schema
def parse_data_from_kafka_message(sdf, schema):
"""Extract data from kafka message's value and specify schema
Parameters
----------
sdf : pyspark.sql.DataFrame
DataFrame obtained from Kafka stream, for which df.isStreaming is True
schema : OrderedDict or pyspark.sql.types.StructType
Dictionary that preserves order, containing columns' names as keys and data types as values.
Alternatively, pyspark.sql.types.StructType could define schema.
In any case, each column's data type is specified as type from pyspark.sql.types
Returns
----------
pyspark.sql.DataFrame
DataFrame with actual data from Kafka message
"""
from pyspark.sql.types import StructType
from pyspark.sql.functions import split
assert sdf.isStreaming == True, "DataFrame doesn't receive a streaming data"
col = split(sdf['value'], ',') #split attributes to nested array in one Column
#now expand col to multiple top-level columns
if isinstance(schema, dict):
for idx, (colName, colType) in enumerate(schema.items()):
sdf = sdf.withColumn(colName, col.getItem(idx).cast(colType))
return sdf.select(list(schema.keys())) # select colNames
elif isinstance(schema, StructType):
for idx, field in enumerate(schema):
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
return sdf.select([field.name for field in schema])
else:
raise TypeError("schema has to be OrderedDict or pyspark.sql.types.StructField")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment