Created
November 25, 2025 01:15
-
-
Save mesquita/aeda3920583d8318ae54145eda51559c to your computer and use it in GitHub Desktop.
s3_to_pandas
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Main module for processing JSON files from S3 and creating flattened DataFrames.""" | |
| import logging | |
| from typing import Optional | |
| import awswrangler as wr | |
| import boto3 | |
| import pandas as pd | |
| def setup_logging(level: str = "INFO") -> None: | |
| """Configure logging for the application.""" | |
| logging.basicConfig( | |
| level=getattr(logging, level.upper()), | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| ) | |
| def flatten_model_result_df(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Flatten the model_result field into separate columns for entire DataFrame. | |
| Args: | |
| df: A pandas DataFrame containing the data with model_result column. | |
| Returns: | |
| A pandas DataFrame with flattened model_result fields. | |
| """ | |
| if df.empty or "model_result" not in df.columns: | |
| return df | |
| # Create a copy to avoid modifying original | |
| result_df = df.copy() | |
| # Initialize new columns | |
| result_df["token_count"] = None | |
| # Process each row to extract model_result data | |
| for idx, row in df.iterrows(): | |
| model_result = row["model_result"] | |
| if model_result is None: | |
| continue | |
| if not isinstance(model_result, list) or len(model_result) < 2: | |
| logging.warning(f"Invalid model_result format in row {idx}") | |
| continue | |
| # Extract dictionary (first element) and token count (second element) | |
| result_dict = model_result[0] if isinstance(model_result[0], dict) else {} | |
| token_count = model_result[1] if len(model_result) > 1 else None | |
| # Set token count | |
| result_df.at[idx, "token_count"] = token_count | |
| # Add flattened dictionary fields | |
| for key, value in result_dict.items(): | |
| col_name = f"model_{key}" | |
| if col_name not in result_df.columns: | |
| result_df[col_name] = None | |
| result_df.at[idx, col_name] = value | |
| # Remove original model_result column | |
| result_df = result_df.drop("model_result", axis=1) | |
| return result_df | |
| def process_s3_json_files( | |
| bucket: str, | |
| prefix: str, | |
| aws_profile: Optional[str] = None, | |
| region: str = "us-east-1", | |
| ) -> pd.DataFrame: | |
| """ | |
| Process JSON files from S3 and create a flattened DataFrame. | |
| Args: | |
| bucket: S3 bucket name. | |
| prefix: S3 prefix to filter files. | |
| aws_profile: AWS profile name (optional). | |
| region: AWS region name. | |
| Returns: | |
| A pandas DataFrame with flattened data. | |
| Raises: | |
| ValueError: If no valid JSON files are found. | |
| """ | |
| # Create boto3 session | |
| session = boto3.Session(profile_name=aws_profile, region_name=region) | |
| s3_client = session.client("s3") | |
| logging.info(f"Processing files from s3://{bucket}/{prefix}") | |
| try: | |
| response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) | |
| except Exception as e: | |
| logging.error(f"Failed to list objects in bucket {bucket}: {e}") | |
| raise | |
| if "Contents" not in response: | |
| raise ValueError(f"No files found in s3://{bucket}/{prefix}") | |
| dfs = [] | |
| for obj in response["Contents"]: | |
| key = obj["Key"] | |
| if not key.endswith(".json"): | |
| continue | |
| path = f"s3://{bucket}/{key}" | |
| logging.info(f"Processing file: {key}") | |
| try: | |
| df_temp = wr.s3.read_json(path=path, boto3_session=session) | |
| if df_temp.empty: | |
| logging.warning(f"Empty DataFrame from {key}") | |
| continue | |
| # Apply flattening to DataFrame | |
| df_temp = flatten_model_result_df(df_temp) | |
| dfs.append(df_temp) | |
| except Exception as e: | |
| logging.error(f"Error processing {key}: {e}") | |
| continue | |
| if not dfs: | |
| raise ValueError("No valid JSON files processed successfully") | |
| # Concatenate all DataFrames | |
| result_df = pd.concat(dfs, ignore_index=True) | |
| logging.info( | |
| f"Successfully processed {len(dfs)} files, resulting in {len(result_df)} rows" | |
| ) | |
| return result_df | |
| def main() -> None: | |
| """Main function to demonstrate usage.""" | |
| setup_logging() | |
| # Example usage - replace with actual values | |
| bucket = "your-bucket-name" | |
| prefix = "your/prefix/" | |
| aws_profile = None # or "your-profile-name" | |
| try: | |
| df = process_s3_json_files( | |
| bucket=bucket, | |
| prefix=prefix, | |
| aws_profile=aws_profile, | |
| ) | |
| print(f"DataFrame shape: {df.shape}") | |
| print("\nDataFrame columns:") | |
| print(df.columns.tolist()) | |
| print("\nFirst few rows:") | |
| print(df.head()) | |
| except Exception as e: | |
| logging.error(f"Failed to process files: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment