-
-
Save nmukerje/e65cde41be85470e4b8dfd9a2d6aed50 to your computer and use it in GitHub Desktop.
from pyspark.sql.types import * | |
from pyspark.sql.functions import * | |
#Flatten array of structs and structs | |
def flatten(df): | |
# compute Complex Fields (Lists and Structs) in Schema | |
complex_fields = dict([(field.name, field.dataType) | |
for field in df.schema.fields | |
if type(field.dataType) == ArrayType or type(field.dataType) == StructType]) | |
while len(complex_fields)!=0: | |
col_name=list(complex_fields.keys())[0] | |
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name]))) | |
# if StructType then convert all sub element to columns. | |
# i.e. flatten structs | |
if (type(complex_fields[col_name]) == StructType): | |
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]] | |
df=df.select("*", *expanded).drop(col_name) | |
# if ArrayType then add the Array Elements as Rows using the explode function | |
# i.e. explode Arrays | |
elif (type(complex_fields[col_name]) == ArrayType): | |
df=df.withColumn(col_name,explode_outer(col_name)) | |
# recompute remaining Complex Fields in Schema | |
complex_fields = dict([(field.name, field.dataType) | |
for field in df.schema.fields | |
if type(field.dataType) == ArrayType or type(field.dataType) == StructType]) | |
return df | |
df=flatten(df) | |
df.printSchema() |
I have updated this function and add a fix for mapType
I have also created a JIRA for this.
help me
When running the code above, everything worked, but I need to treat the data that comes in timestamp format to date. Can someone help me?
Exit:
_v | 0
action | To create
data_updatedAt | 2023-01-26T15:10:...
date$date | 1674745838876 <<<<<<<
I need:
_v | 0
action | To create
data_updatedAt | 2023-01-26T15:10:...
date$date | 2023-01-26T15:10:... <<<<<<<
Thanks a lot for your work, it works great.
Hello, I tried to use mapType in Spark Streaming but it's not working due to an issue in the code.
Below is the one giving issue while doing in Spark Streaming :
keys = list(map(lambda row: row[0], keys_df.collect()))
Please let me know the best option to resolve it in Spark Structure Steaming.
This function flatten(), fails when there is nested array inside array, It failed to flatten these, "user_mentions": [
{
"screen_name": "AshJone15461246",
"name": "Ash Jones",
"id": 1589564629369462800,
"id_str": "1589564629369462784",
"indices": [
0,
16
]
},
{
"screen_name": "BariAWilliams",
"name": "Bärí A. Williams, Esq.",
"id": 4639656854,
"id_str": "4639656854",
"indices": [
17,
31
]
},
{
"screen_name": "bjorn_hefnoll",
"name": "Björn",
"id": 1374096417954881500,
"id_str": "1374096417954881548",
"indices": [
32,
46
]
},
{
"screen_name": "SpencerAlthouse",
"name": "Spencer Althouse",
"id": 38307346,
"id_str": "38307346",
"indices": [
47,
63
]
}
].
Line 21: #if ArrayType then add the Array Elements as Rows using the explode function
Is there a way we can add Array Elements as columns rather than rows.
Tried to use posexplode to later pivot the table without success.
I tried to use df=df.selectExpr("*", posexplode_outer(col_name).alias("position",col_name)) but getting error
"TypeError: Column is not iterable"