Skip to content

Instantly share code, notes, and snippets.

@ourway
Created March 4, 2025 07:59
Show Gist options
  • Save ourway/d8cc3b03f1b839acd5e5749b60282eb3 to your computer and use it in GitHub Desktop.
Save ourway/d8cc3b03f1b839acd5e5749b60282eb3 to your computer and use it in GitHub Desktop.
import json
class IcebergSchemaFlattener:
def __init__(self):
self.flattened_fields = []
def flatten_schema(self, columns):
for column in columns:
self._process_column(column, parent_path=[])
return self.flattened_fields
def _process_column(self, column, parent_path):
current_path = parent_path + [column["Name"]]
field_type = column["Type"]
parameters = column.get("Parameters", {})
if field_type.startswith("struct<"):
self._process_struct(field_type, current_path, parameters)
else:
self._add_field(current_path, field_type, parameters)
def _process_struct(self, struct_type, path, parameters):
inner_fields = struct_type[len("struct<"):-1]
for field_str in self._split_struct_fields(inner_fields):
name, ftype = self._split_name_type(field_str)
self._process_column({
"Name": name,
"Type": ftype,
"Parameters": parameters
}, path)
def _split_struct_fields(self, fields_str):
fields = []
current = []
level = 0
for c in fields_str:
if c in {'<', '('}:
level += 1
elif c in {'>', ')'}:
level -= 1
elif c == ',' and level == 0:
fields.append(''.join(current).strip())
current = []
continue
current.append(c)
if current:
fields.append(''.join(current).strip())
return fields
def _split_name_type(self, field_str):
colon_pos = -1
stack = []
for i, c in enumerate(field_str):
if c == ':':
if not stack:
colon_pos = i
break
elif c in {'<', '('}:
stack.append(c)
elif c in {'>', ')'}:
if stack:
stack.pop()
if colon_pos == -1:
raise ValueError(f"Invalid field format: {field_str}")
return field_str[:colon_pos].strip(), field_str[colon_pos+1:].strip()
def _add_field(self, path, field_type, parameters):
self.flattened_fields.append({
"Name": ".".join(path),
"Type": field_type,
"Parameters": parameters.copy()
})
# Usage example
def flatten_iceberg_schema(schema_file):
with open(schema_file, "r") as f:
schema_data = json.load(f)
flattener = IcebergSchemaFlattener()
return flattener.flatten_schema(schema_data["StorageDescriptor"]["Columns"])
# Example usage
flattened = flatten_iceberg_schema("schemas/10x_replicated_client_ledger_transaction_event_v001")
for field in flattened:
print(field)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment