Created
October 10, 2025 15:39
-
-
Save pbrumblay/e4a27c86eafbcc640a24796379c3ef09 to your computer and use it in GitHub Desktop.
grants.gov
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import xml.etree.ElementTree as ET | |
| from datetime import datetime | |
| from typing import List, Optional, Dict, Any, Union | |
| from pydantic import BaseModel, Field, validator | |
| import os | |
| import pandas as pd | |
| from google.cloud import bigquery, storage | |
| # --- 1. Define the Target Schema using Pydantic --- | |
| # This class defines the structure of our final, clean output object. | |
| # Pydantic will automatically validate the data and handle type conversions. | |
| class NormalizedGrant(BaseModel): | |
| composite_id: str | |
| source: str | |
| source_id: str | |
| title: Optional[str] = None | |
| agency: Optional[str] = None | |
| description: Optional[str] = None | |
| url: str | |
| close_date: Optional[datetime] = None | |
| amount: Optional[int] = Field(None, alias="AwardCeiling") | |
| eligibility: Optional[str] = None | |
| posted_date: Optional[datetime] = None | |
| opportunity_category: Optional[str] = None | |
| funding_category: Optional[str] = None | |
| closing_date_explanation: Optional[str] = None | |
| additional_eligibility_info: Optional[str] = None | |
| class Config: | |
| # This allows us to map directly from an XML element's fields | |
| # using the `alias` in the Field definition (e.g., "AwardCeiling"). | |
| allow_population_by_field_name = True | |
| @validator('close_date', 'posted_date', pre=True) | |
| def format_date(cls, v): | |
| """ | |
| Custom validator to parse the MMDDYYYY date format from the XML. | |
| It runs before other validations ('pre=True'). | |
| """ | |
| if v is None: | |
| return None | |
| try: | |
| # Grants.gov uses MMDDYYYY format | |
| return datetime.strptime(v, '%m%d%Y') | |
| except (ValueError, TypeError): | |
| return None # Return None if date is invalid or empty | |
| # --- 2. Create the Parser and Transformer Class --- | |
| class GrantsGovParser: | |
| """ | |
| A class to handle extracting, parsing, and transforming grant data | |
| from a Grants.gov XML file into a normalized format. | |
| """ | |
| def _get_child_text(self, element: ET.Element, tag_name: str) -> Optional[str]: | |
| """Safely finds a child element and returns its text, or None.""" | |
| child = element.find(tag_name) | |
| if child is not None and child.text: | |
| return child.text.strip() | |
| return None | |
| def _get_first_available_field(self, element: ET.Element, tag_names: List[str]) -> Optional[str]: | |
| """ | |
| Finds the first available and non-empty text from a list of possible tags. | |
| This is the implementation for the user-requested `_get_field_optimized`. | |
| """ | |
| for tag in tag_names: | |
| text = self._get_child_text(element, tag) | |
| if text: | |
| return text | |
| return None | |
| def transform_opportunity(self, element: ET.Element) -> Optional[Dict[str, Any]]: | |
| """ | |
| Transforms a single XML <OpportunitySynopsisDetail_1_0> element | |
| into a normalized Python dictionary. | |
| """ | |
| source_id = self._get_child_text(element, "OpportunityID") | |
| if not source_id: | |
| # Every valid record must have an ID. | |
| return None | |
| # Extract data using the helper methods | |
| title = self._get_child_text(element, "OpportunityTitle") | |
| agency = self._get_child_text(element, "AgencyName") | |
| description = self._get_child_text(element, "Description") | |
| posted_date_str = self._get_child_text(element, "PostDate") | |
| close_date_str = self._get_child_text(element, "CloseDate") | |
| award_ceiling = self._get_child_text(element, "AwardCeiling") | |
| # Construct the dictionary for Pydantic validation | |
| grant_data = { | |
| "composite_id": f"grants.gov-xml:{source_id}", | |
| "source": "grants.gov-xml", | |
| "source_id": source_id, | |
| "title": title, | |
| "agency": agency, | |
| "description": description, | |
| "url": f"https://www.grants.gov/web/grants/view-opportunity.html?oppId={source_id}", | |
| "close_date": close_date_str, | |
| "posted_date": posted_date_str, | |
| "AwardCeiling": int(award_ceiling) if award_ceiling and award_ceiling.isdigit() else None, | |
| "eligibility": self._get_first_available_field(element, [ | |
| "EligibleApplicants" | |
| ]), | |
| "opportunity_category": self._get_first_available_field(element, [ | |
| "OpportunityCategory", | |
| "OpportunityCategoryExplanation" | |
| ]), | |
| "funding_category": self._get_first_available_field(element, [ | |
| "CategoryOfFundingActivity", | |
| "CategoryExplanation" | |
| ]), | |
| "closing_date_explanation": self._get_first_available_field(element, [ | |
| "CloseDateExplanation" | |
| ]), | |
| "additional_eligibility_info": self._get_first_available_field(element, [ | |
| "AdditionalInformationOnEligibility" | |
| ]) | |
| } | |
| # Validate and structure the data using the Pydantic model | |
| validated_grant = NormalizedGrant(**grant_data) | |
| # Return the data as a dictionary | |
| return validated_grant.dict(by_alias=False) | |
| def parse_and_transform(self, xml_file_path: str) -> List[Dict[str, Any]]: | |
| """ | |
| Parses an entire XML file and transforms all grant opportunities. | |
| """ | |
| try: | |
| tree = ET.parse(xml_file_path) | |
| root = tree.getroot() | |
| except ET.ParseError as e: | |
| print(f"Error parsing XML file: {e}") | |
| return [] | |
| transformed_grants = [] | |
| # The XML structure has all opportunities as direct children of the root. | |
| for opportunity_element in root: | |
| transformed_data = self.transform_opportunity(opportunity_element) | |
| if transformed_data: | |
| transformed_grants.append(transformed_data) | |
| return transformed_grants | |
| def load_to_bigquery(self, data: List[Dict[str, Any]], gcs_bucket_name: str, gcs_blob_name: str, bq_table_id: str): | |
| """ | |
| Loads the transformed data into BigQuery via a GCS staging file. | |
| Args: | |
| data: A list of normalized grant dictionaries. | |
| gcs_bucket_name: The GCS bucket to use for staging. | |
| gcs_blob_name: The name for the staging file in GCS (e.g., 'staging/grants.parquet'). | |
| bq_table_id: The full BigQuery table ID (e.g., 'my-project.my_dataset.grants'). | |
| """ | |
| if not data: | |
| print("No data to load. Aborting.") | |
| return | |
| # 1. Convert data to a Pandas DataFrame | |
| df = pd.DataFrame(data) | |
| # Ensure datetime columns are in the correct format for BigQuery | |
| for col in ['close_date', 'posted_date']: | |
| df[col] = pd.to_datetime(df[col], utc=True) | |
| # 2. Save the DataFrame to a local Parquet file | |
| local_parquet_file = "grants.parquet" | |
| df.to_parquet(local_parquet_file, index=False) | |
| print(f"Successfully converted data to '{local_parquet_file}'.") | |
| # 3. Upload Parquet file to GCS | |
| storage_client = storage.Client() | |
| bucket = storage_client.bucket(gcs_bucket_name) | |
| blob = bucket.blob(gcs_blob_name) | |
| blob.upload_from_filename(local_parquet_file) | |
| gcs_uri = f"gs://{gcs_bucket_name}/{gcs_blob_name}" | |
| print(f"Successfully uploaded Parquet file to {gcs_uri}.") | |
| # 4. Trigger BigQuery Load Job | |
| bq_client = bigquery.Client() | |
| job_config = bigquery.LoadJobConfig( | |
| source_format=bigquery.SourceFormat.PARQUET, | |
| # This will overwrite the table each time, perfect for full daily extracts. | |
| write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, | |
| autodetect=True, # BigQuery can infer the schema from Parquet | |
| ) | |
| load_job = bq_client.load_table_from_uri(gcs_uri, bq_table_id, job_config=job_config) | |
| print(f"Starting BigQuery load job {load_job.job_id}...") | |
| load_job.result() # Waits for the job to complete. | |
| print(f"Job finished. Loaded {load_job.output_rows} rows into {bq_table_id}.") | |
| # 5. Clean up local file | |
| os.remove(local_parquet_file) | |
| print(f"Cleaned up local file: '{local_parquet_file}'.") | |
| # --- 3. Example Usage --- | |
| if __name__ == "__main__": | |
| import json | |
| # --- Configuration --- | |
| # Replace with your actual GCP project details | |
| GCS_BUCKET = "your-gcs-bucket-name" | |
| GCS_STAGING_FILE = "staging/grants_gov_data.parquet" | |
| BIGQUERY_TABLE_ID = "your-gcp-project-id.your_bigquery_dataset.grants_gov" | |
| # --- Execution --- | |
| parser = GrantsGovParser() | |
| xml_file = "sample_grants.xml" | |
| print(f"1. Parsing and transforming data from '{xml_file}'...") | |
| normalized_data = parser.parse_and_transform(xml_file) | |
| if normalized_data: | |
| print(f"Successfully transformed {len(normalized_data)} grant opportunities.") | |
| # You can still inspect the data before loading | |
| # print("\n--- Sample Transformed Grant ---") | |
| # print(json.dumps(normalized_data[0], indent=2, default=str)) | |
| print("\n2. Loading data to BigQuery...") | |
| if GCS_BUCKET == "your-gcs-bucket-name": | |
| print("SKIPPING LOAD: Please configure your GCS_BUCKET and BIGQUERY_TABLE_ID.") | |
| else: | |
| parser.load_to_bigquery(normalized_data, GCS_BUCKET, GCS_STAGING_FILE, BIGQUERY_TABLE_ID) | |
| else: | |
| print("No data was transformed. Please check the XML file and its format.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment