Created
February 11, 2025 20:59
-
-
Save donbr/5293468436a1a39bd2d9f4959cbd4923 to your computer and use it in GitHub Desktop.
GDELT 2025 Prefect ETL process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import asyncio | |
from prefect import flow, task, get_run_logger | |
from prefect.tasks import task_input_hash | |
from prefect.blocks.system import Secret, JSON | |
from prefect.task_runners import ConcurrentTaskRunner | |
from prefect.concurrency.sync import concurrency | |
from pathlib import Path | |
import datetime | |
from datetime import timedelta | |
import pandas as pd | |
from tqdm import tqdm | |
from huggingface_hub import HfApi, hf_hub_url, list_datasets | |
import requests | |
import zipfile | |
from typing import List, Dict, Optional | |
# --- Constants --- | |
# Set a global concurrency limit for Hugging Face uploads | |
REPO_ID = "dwb2023/gdelt-gkg-2025-v2" | |
BASE_URL = "http://data.gdeltproject.org/gdeltv2" | |
# Complete Column List | |
GKG_COLUMNS = [ | |
'GKGRECORDID', # Unique identifier | |
'DATE', # Publication date | |
'SourceCollectionIdentifier', # Source type | |
'SourceCommonName', # Source name | |
'DocumentIdentifier', # Document URL/ID | |
'V1Counts', # Counts of various types | |
'V2.1Counts', # Enhanced counts with positions | |
'V1Themes', # Theme tags | |
'V2EnhancedThemes', # Themes with positions | |
'V1Locations', # Location mentions | |
'V2EnhancedLocations', # Locations with positions | |
'V1Persons', # Person names | |
'V2EnhancedPersons', # Persons with positions | |
'V1Organizations', # Organization names | |
'V2EnhancedOrganizations', # Organizations with positions | |
'V1.5Tone', # Emotional dimensions | |
'V2.1EnhancedDates', # Date mentions | |
'V2GCAM', # Global Content Analysis Measures | |
'V2.1SharingImage', # Publisher selected image | |
'V2.1RelatedImages', # Article images | |
'V2.1SocialImageEmbeds', # Social media images | |
'V2.1SocialVideoEmbeds', # Social media videos | |
'V2.1Quotations', # Quote extractions | |
'V2.1AllNames', # Named entities | |
'V2.1Amounts', # Numeric amounts | |
'V2.1TranslationInfo', # Translation metadata | |
'V2ExtrasXML' # Additional XML data | |
] | |
# Priority Columns | |
PRIORITY_COLUMNS = [ | |
'GKGRECORDID', # Unique identifier | |
'DATE', # Publication date | |
'SourceCollectionIdentifier', # Source type | |
'SourceCommonName', # Source name | |
'DocumentIdentifier', # Document URL/ID | |
'V1Counts', # Numeric mentions | |
'V2.1Counts', # Enhanced counts | |
'V1Themes', # Theme tags | |
'V2EnhancedThemes', # Enhanced themes | |
'V1Locations', # Geographic data | |
'V2EnhancedLocations', # Enhanced locations | |
'V1Persons', # Person mentions | |
'V2EnhancedPersons', # Enhanced persons | |
'V1Organizations', # Organization mentions | |
'V2EnhancedOrganizations', # Enhanced organizations | |
'V1.5Tone', # Sentiment scores | |
'V2.1EnhancedDates', # Date mentions | |
'V2GCAM', # Global Content Analysis Measures | |
'V2.1SharingImage', # Publisher selected image | |
'V2.1Quotations', # Direct quotes | |
'V2.1AllNames', # All named entities | |
'V2.1Amounts' # Numeric data | |
] | |
# --- Tasks --- | |
@task(retries=3, retry_delay_seconds=30, log_prints=True) | |
def setup_directories(base_path: Path) -> dict: | |
"""Create processing directories.""" | |
logger = get_run_logger() | |
try: | |
raw_dir = base_path / "gdelt_raw" | |
processed_dir = base_path / "gdelt_processed" | |
raw_dir.mkdir(parents=True, exist_ok=True) | |
processed_dir.mkdir(parents=True, exist_ok=True) | |
logger.info("Directories created successfully") | |
return {"raw": raw_dir, "processed": processed_dir} | |
except Exception as e: | |
logger.error(f"Directory creation failed: {str(e)}") | |
raise | |
@task(retries=2, log_prints=True) | |
def generate_gdelt_urls(start_date: datetime.datetime, end_date: datetime.datetime) -> Dict[datetime.date, List[str]]: | |
""" | |
Generate a dictionary keyed by date. Each value is a list of URLs (one per 15-minute interval). | |
""" | |
logger = get_run_logger() | |
url_groups = {} | |
try: | |
current_date = start_date.date() | |
while current_date <= end_date.date(): | |
urls = [ | |
f"{BASE_URL}/{current_date.strftime('%Y%m%d')}{hour:02}{minute:02}00.gkg.csv.zip" | |
for hour in range(24) | |
for minute in [0, 15, 30, 45] | |
] | |
url_groups[current_date] = urls | |
current_date += timedelta(days=1) | |
logger.info(f"Generated URL groups for dates: {list(url_groups.keys())}") | |
return url_groups | |
except Exception as e: | |
logger.error(f"URL generation failed: {str(e)}") | |
raise | |
@task(retries=3, retry_delay_seconds=30, log_prints=True) | |
def download_file(url: str, raw_dir: Path) -> Path: | |
"""Download a single CSV (zip) file from the given URL.""" | |
logger = get_run_logger() | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
filename = Path(url).name | |
zip_path = raw_dir / filename | |
with zip_path.open('wb') as f: | |
f.write(response.content) | |
logger.info(f"Downloaded {filename}") | |
# Optionally, extract the CSV from the ZIP archive. | |
with zipfile.ZipFile(zip_path, 'r') as z: | |
# Assuming the zip contains one CSV file. | |
csv_names = z.namelist() | |
if csv_names: | |
extracted_csv = raw_dir / csv_names[0] | |
z.extractall(path=raw_dir) | |
logger.info(f"Extracted {csv_names[0]}") | |
return extracted_csv | |
else: | |
raise ValueError("Zip file is empty.") | |
except Exception as e: | |
logger.error(f"Error downloading {url}: {str(e)}") | |
raise | |
@task(retries=2, log_prints=True) | |
def convert_and_filter_combined(csv_paths: List[Path], processed_dir: Path, date: datetime.date) -> Path: | |
""" | |
Combine multiple CSV files (for one day) into a single DataFrame, | |
filter to only the required columns, optimize data types, | |
and write out as a single Parquet file. | |
""" | |
logger = get_run_logger() | |
try: | |
dfs = [] | |
for csv_path in csv_paths: | |
df = pd.read_csv( | |
csv_path, | |
sep='\t', | |
names=GKG_COLUMNS, | |
dtype='string', | |
quoting=3, | |
na_values=[''], | |
encoding='utf-8', | |
encoding_errors='replace' | |
) | |
dfs.append(df) | |
combined_df = pd.concat(dfs, ignore_index=True) | |
filtered_df = combined_df[PRIORITY_COLUMNS].copy() | |
# Convert the date field to datetime; adjust the format if necessary. | |
if 'V2.1DATE' in filtered_df.columns: | |
filtered_df['V2.1DATE'] = pd.to_datetime( | |
filtered_df['V2.1DATE'], format='%Y%m%d%H%M%S', errors='coerce' | |
) | |
output_filename = f"gdelt_gkg_{date.strftime('%Y%m%d')}.parquet" | |
output_path = processed_dir / output_filename | |
filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False) | |
logger.info(f"Converted and filtered data for {date} into {output_filename}") | |
return output_path | |
except Exception as e: | |
logger.error(f"Error processing CSVs for {date}: {str(e)}") | |
raise | |
@task(retries=3, retry_delay_seconds=30, log_prints=True) | |
def upload_to_hf(file_path: Path, token: str) -> bool: | |
"""Upload task with global concurrency limit.""" | |
logger = get_run_logger() | |
try: | |
with concurrency("hf_uploads", occupy=1): | |
# Enable the optimized HF Transfer backend. | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
api = HfApi() | |
api.upload_file( | |
path_or_fileobj=str(file_path), | |
path_in_repo=file_path.name, | |
repo_id=REPO_ID, | |
repo_type="dataset", | |
token=token, | |
) | |
logger.info(f"Uploaded {file_path.name}") | |
return True | |
except Exception as e: | |
logger.error(f"Upload failed for {file_path.name}: {str(e)}") | |
raise | |
@task(retries=3, retry_delay_seconds=120, log_prints=True) | |
def create_hf_repo(token: str) -> bool: | |
""" | |
Validate that the Hugging Face dataset repository exists; create it if not. | |
""" | |
logger = get_run_logger() | |
try: | |
api = HfApi() | |
datasets = [ds.id for ds in list_datasets(token=token)] | |
if REPO_ID in datasets: | |
logger.info(f"Dataset repository '{REPO_ID}' already exists.") | |
return True | |
# Create the repository if it doesn't exist. | |
api.create_repo(repo_id=REPO_ID, repo_type="dataset", token=token, private=False) | |
logger.info(f"Successfully created dataset repository: {REPO_ID}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to create or validate dataset repo '{REPO_ID}': {str(e)}") | |
raise RuntimeError(f"Repository validation/creation failed for '{REPO_ID}'") from e | |
@flow(name="Process Single Day", log_prints=True) | |
def process_single_day( | |
date: datetime.date, urls: List[str], directories: dict, hf_token: str | |
) -> bool: | |
""" | |
Process one day's data by: | |
1. Downloading all CSV files concurrently. | |
2. Merging, filtering, and optimizing the CSVs. | |
3. Writing out a single daily Parquet file. | |
4. Uploading the file to the Hugging Face Hub. | |
""" | |
logger = get_run_logger() | |
try: | |
# Download and process data (unlimited concurrency) | |
csv_paths = [download_file(url, directories["raw"]) for url in urls] | |
daily_parquet = convert_and_filter_combined(csv_paths, directories["processed"], date) | |
# Upload with global concurrency limit | |
upload_to_hf(daily_parquet, hf_token) # <-- Throttled to 2 concurrent | |
logger.info(f"Completed {date}") | |
return True | |
except Exception as e: | |
logger.error(f"Day {date} failed: {str(e)}") | |
raise | |
@flow( | |
name="Process Date Range", | |
task_runner=ConcurrentTaskRunner(), # Parallel subflows | |
log_prints=True | |
) | |
def process_date_range(base_path: Path = Path("data")): | |
""" | |
Main ETL flow: | |
1. Load parameters and credentials. | |
2. Validate (or create) the Hugging Face repository. | |
3. Setup directories. | |
4. Generate URL groups by date. | |
5. Process each day concurrently. | |
""" | |
logger = get_run_logger() | |
# Load parameters from a JSON block. | |
json_block = JSON.load("jdelt-etl-param-2025") | |
params = json_block.value | |
start_date = datetime.datetime.fromisoformat(params.get("start_date", "2025-02-03T00:00:00")) | |
end_date = datetime.datetime.fromisoformat(params.get("end_date", "2025-02-10T00:00:00")) | |
# Load the Hugging Face token from a Secret block. | |
secret_block = Secret.load("huggingface-token") | |
hf_token = secret_block.get() | |
# Validate or create the repository. | |
create_hf_repo(hf_token) | |
directories = setup_directories(base_path) | |
url_groups = generate_gdelt_urls(start_date, end_date) | |
# Process days concurrently (subflows) | |
futures = [process_single_day(date, urls, directories, hf_token) | |
for date, urls in url_groups.items()] | |
# Wait for completion (optional error handling) | |
for future in futures: | |
try: | |
future.result() | |
except Exception as e: | |
logger.error(f"Failed day: {str(e)}") | |
# --- Entry Point --- | |
if __name__ == "__main__": | |
process_date_range.serve( | |
name="gdelt-etl-2025-v1", | |
tags=["gdelt", "etl", "production", "2025"], | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment