Skip to content

Instantly share code, notes, and snippets.

@nmukerje
Last active February 20, 2024 07:26
Show Gist options
  • Save nmukerje/e65cde41be85470e4b8dfd9a2d6aed50 to your computer and use it in GitHub Desktop.
Save nmukerje/e65cde41be85470e4b8dfd9a2d6aed50 to your computer and use it in GitHub Desktop.
from pyspark.sql.types import *
from pyspark.sql.functions import *
#Flatten array of structs and structs
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the explode function
# i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
df=flatten(df)
df.printSchema()
@bjornjorgensen
Copy link

oh, when I use this function I disable line nr 12 place a # in front of it.

There an error in string https://itsmycode.com/python-typeerror-str-object-is-not-callable-solution/

@ThiagoPositeli
Copy link

thanks @bjornjorgensen for the help.
I just kill the cluster and created another and the function start running again with the same files 🤣

@Mavericks334
Copy link

Mavericks334 commented Aug 23, 2022

Hi,
I get the below error. Any suggestions how to avoid it. I have nodes that have multiple nodes within it. It could go upto 3 or 4 levels

root
|-- code: string (nullable = true)
|-- rule_id: string (nullable = true)
|-- from: date (nullable = true)
|-- _to: date (nullable = true)
|-- type: string (nullable = true)
|-- definition: string (nullable = true)
|-- description: string (nullable = true)
|-- created_on: timestamp (nullable = true)
|-- creator: string (nullable = true)
|-- modified_on: timestamp (nullable = true)
|-- modifier: string (nullable = true)

IndexError                                Traceback (most recent call last)
<command-4235716148475136> in <module>
----> 1 flatten_df = flatten(ar)
      2 flatten_df.show()

<command-4320286398733830> in flatten(df)
     10     ])
     11 
---> 12     qualify = list(complex_fields.keys())[0] + "_"
     13 
     14     while len(complex_fields) != 0:

IndexError: list index out of range

@ttdidier
Copy link

Line 21: #if ArrayType then add the Array Elements as Rows using the explode function
Is there a way we can add Array Elements as columns rather than rows.
Tried to use posexplode to later pivot the table without success.

I tried to use df=df.selectExpr("*", posexplode_outer(col_name).alias("position",col_name)) but getting error
"TypeError: Column is not iterable"

@bjornjorgensen
Copy link

I have updated this function and add a fix for mapType
I have also created a JIRA for this.

@franciscodara
Copy link

help me
When running the code above, everything worked, but I need to treat the data that comes in timestamp format to date. Can someone help me?

Exit:
_v | 0
action | To create
data_updatedAt | 2023-01-26T15:10:...
date
$date | 1674745838876 <<<<<<<

I need:
_v | 0
action | To create
data_updatedAt | 2023-01-26T15:10:...
date
$date | 2023-01-26T15:10:... <<<<<<<

@CMonte2
Copy link

CMonte2 commented Mar 9, 2023

Thanks a lot for your work, it works great.

@anayyar82
Copy link

Hello, I tried to use mapType in Spark Streaming but it's not working due to an issue in the code.

Below is the one giving issue while doing in Spark Streaming :

        keys = list(map(lambda row: row[0], keys_df.collect()))

Please let me know the best option to resolve it in Spark Structure Steaming.

@prafulacharya
Copy link

This function flatten(), fails when there is nested array inside array, It failed to flatten these, "user_mentions": [
{
"screen_name": "AshJone15461246",
"name": "Ash Jones",
"id": 1589564629369462800,
"id_str": "1589564629369462784",
"indices": [
0,
16
]
},
{
"screen_name": "BariAWilliams",
"name": "Bärí A. Williams, Esq.",
"id": 4639656854,
"id_str": "4639656854",
"indices": [
17,
31
]
},
{
"screen_name": "bjorn_hefnoll",
"name": "Björn",
"id": 1374096417954881500,
"id_str": "1374096417954881548",
"indices": [
32,
46
]
},
{
"screen_name": "SpencerAlthouse",
"name": "Spencer Althouse",
"id": 38307346,
"id_str": "38307346",
"indices": [
47,
63
]
}
].

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment