Skip to content

Instantly share code, notes, and snippets.

@Menziess
Created March 19, 2024 12:13
Show Gist options
  • Save Menziess/11201a23d4ec197ca57a257927718df6 to your computer and use it in GitHub Desktop.
Save Menziess/11201a23d4ec197ca57a257927718df6 to your computer and use it in GitHub Desktop.
Spark flatten nested structures
from pyspark.sql.functions import *
def flatten_structs(df):
"""Omits lists, and flattens structs into regular columns.
>>> flatten_structs(test_df).show() # doctest: +NORMALIZE_WHITESPACE
Omitted column rootstructype.nestedstructtype
Omitted column arraytype
+---+--------+---------+------------------+------------------+------------------+
| id| money|timestamp|structtype.number1|structtype.number2|structtype.number3|
+---+--------+---------+------------------+------------------+------------------+
| 1|$100.000| 14| 1| 2| 3|
| 1|$200.000| 15| 3| 2| 1|
| 1| $10.000| 17| 1| 3| 2|
| 2| -$100| 17| 3| 1| 2|
| 2| $100| 14| 2| 1| 3|
+---+--------+---------+------------------+------------------+------------------+
"""
struct_selectors = []
for c in df.schema.jsonValue()['fields']:
if isinstance(c['type'], str):
struct_selectors.append(c['name'])
elif isinstance(c['type'], dict) and c['type']['type'] == 'struct':
for field in c['type']['fields']:
if isinstance(field['type'], dict) or isinstance(field['type'], list):
print('Omitted column', c['name'] + '.' + field['name'])
else:
struct_selectors.append('.'.join([c['name'], field['name']]))
else:
print('Omitted column', c['name'])
return df.select(*[
col(selector).alias(selector)
for selector
in struct_selectors
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment