Skip to content

Instantly share code, notes, and snippets.

@mesquita
Created November 25, 2025 01:15
Show Gist options
  • Select an option

  • Save mesquita/aeda3920583d8318ae54145eda51559c to your computer and use it in GitHub Desktop.

Select an option

Save mesquita/aeda3920583d8318ae54145eda51559c to your computer and use it in GitHub Desktop.
s3_to_pandas
"""Main module for processing JSON files from S3 and creating flattened DataFrames."""
import logging
from typing import Optional
import awswrangler as wr
import boto3
import pandas as pd
def setup_logging(level: str = "INFO") -> None:
"""Configure logging for the application."""
logging.basicConfig(
level=getattr(logging, level.upper()),
format="%(asctime)s - %(levelname)s - %(message)s",
)
def flatten_model_result_df(df: pd.DataFrame) -> pd.DataFrame:
"""
Flatten the model_result field into separate columns for entire DataFrame.
Args:
df: A pandas DataFrame containing the data with model_result column.
Returns:
A pandas DataFrame with flattened model_result fields.
"""
if df.empty or "model_result" not in df.columns:
return df
# Create a copy to avoid modifying original
result_df = df.copy()
# Initialize new columns
result_df["token_count"] = None
# Process each row to extract model_result data
for idx, row in df.iterrows():
model_result = row["model_result"]
if model_result is None:
continue
if not isinstance(model_result, list) or len(model_result) < 2:
logging.warning(f"Invalid model_result format in row {idx}")
continue
# Extract dictionary (first element) and token count (second element)
result_dict = model_result[0] if isinstance(model_result[0], dict) else {}
token_count = model_result[1] if len(model_result) > 1 else None
# Set token count
result_df.at[idx, "token_count"] = token_count
# Add flattened dictionary fields
for key, value in result_dict.items():
col_name = f"model_{key}"
if col_name not in result_df.columns:
result_df[col_name] = None
result_df.at[idx, col_name] = value
# Remove original model_result column
result_df = result_df.drop("model_result", axis=1)
return result_df
def process_s3_json_files(
bucket: str,
prefix: str,
aws_profile: Optional[str] = None,
region: str = "us-east-1",
) -> pd.DataFrame:
"""
Process JSON files from S3 and create a flattened DataFrame.
Args:
bucket: S3 bucket name.
prefix: S3 prefix to filter files.
aws_profile: AWS profile name (optional).
region: AWS region name.
Returns:
A pandas DataFrame with flattened data.
Raises:
ValueError: If no valid JSON files are found.
"""
# Create boto3 session
session = boto3.Session(profile_name=aws_profile, region_name=region)
s3_client = session.client("s3")
logging.info(f"Processing files from s3://{bucket}/{prefix}")
try:
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
except Exception as e:
logging.error(f"Failed to list objects in bucket {bucket}: {e}")
raise
if "Contents" not in response:
raise ValueError(f"No files found in s3://{bucket}/{prefix}")
dfs = []
for obj in response["Contents"]:
key = obj["Key"]
if not key.endswith(".json"):
continue
path = f"s3://{bucket}/{key}"
logging.info(f"Processing file: {key}")
try:
df_temp = wr.s3.read_json(path=path, boto3_session=session)
if df_temp.empty:
logging.warning(f"Empty DataFrame from {key}")
continue
# Apply flattening to DataFrame
df_temp = flatten_model_result_df(df_temp)
dfs.append(df_temp)
except Exception as e:
logging.error(f"Error processing {key}: {e}")
continue
if not dfs:
raise ValueError("No valid JSON files processed successfully")
# Concatenate all DataFrames
result_df = pd.concat(dfs, ignore_index=True)
logging.info(
f"Successfully processed {len(dfs)} files, resulting in {len(result_df)} rows"
)
return result_df
def main() -> None:
"""Main function to demonstrate usage."""
setup_logging()
# Example usage - replace with actual values
bucket = "your-bucket-name"
prefix = "your/prefix/"
aws_profile = None # or "your-profile-name"
try:
df = process_s3_json_files(
bucket=bucket,
prefix=prefix,
aws_profile=aws_profile,
)
print(f"DataFrame shape: {df.shape}")
print("\nDataFrame columns:")
print(df.columns.tolist())
print("\nFirst few rows:")
print(df.head())
except Exception as e:
logging.error(f"Failed to process files: {e}")
raise
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment