Created
March 4, 2019 21:01
-
-
Save oskarryn/44935b206dad7497fef631c711a5cf87 to your computer and use it in GitHub Desktop.
Util for Spark Structured Streaming. Extract data from kafka message's value and specify schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def parse_data_from_kafka_message(sdf, schema): | |
"""Extract data from kafka message's value and specify schema | |
Parameters | |
---------- | |
sdf : pyspark.sql.DataFrame | |
DataFrame obtained from Kafka stream, for which df.isStreaming is True | |
schema : OrderedDict or pyspark.sql.types.StructType | |
Dictionary that preserves order, containing columns' names as keys and data types as values. | |
Alternatively, pyspark.sql.types.StructType could define schema. | |
In any case, each column's data type is specified as type from pyspark.sql.types | |
Returns | |
---------- | |
pyspark.sql.DataFrame | |
DataFrame with actual data from Kafka message | |
""" | |
from pyspark.sql.types import StructType | |
from pyspark.sql.functions import split | |
assert sdf.isStreaming == True, "DataFrame doesn't receive a streaming data" | |
col = split(sdf['value'], ',') #split attributes to nested array in one Column | |
#now expand col to multiple top-level columns | |
if isinstance(schema, dict): | |
for idx, (colName, colType) in enumerate(schema.items()): | |
sdf = sdf.withColumn(colName, col.getItem(idx).cast(colType)) | |
return sdf.select(list(schema.keys())) # select colNames | |
elif isinstance(schema, StructType): | |
for idx, field in enumerate(schema): | |
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType)) | |
return sdf.select([field.name for field in schema]) | |
else: | |
raise TypeError("schema has to be OrderedDict or pyspark.sql.types.StructField") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment