Skip to content

Instantly share code, notes, and snippets.

@donbr
Created February 11, 2025 20:59
Show Gist options
  • Save donbr/5293468436a1a39bd2d9f4959cbd4923 to your computer and use it in GitHub Desktop.
Save donbr/5293468436a1a39bd2d9f4959cbd4923 to your computer and use it in GitHub Desktop.
GDELT 2025 Prefect ETL process
import os
import asyncio
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret, JSON
from prefect.task_runners import ConcurrentTaskRunner
from prefect.concurrency.sync import concurrency
from pathlib import Path
import datetime
from datetime import timedelta
import pandas as pd
from tqdm import tqdm
from huggingface_hub import HfApi, hf_hub_url, list_datasets
import requests
import zipfile
from typing import List, Dict, Optional
# --- Constants ---
# Set a global concurrency limit for Hugging Face uploads
REPO_ID = "dwb2023/gdelt-gkg-2025-v2"
BASE_URL = "http://data.gdeltproject.org/gdeltv2"
# Complete Column List
GKG_COLUMNS = [
'GKGRECORDID', # Unique identifier
'DATE', # Publication date
'SourceCollectionIdentifier', # Source type
'SourceCommonName', # Source name
'DocumentIdentifier', # Document URL/ID
'V1Counts', # Counts of various types
'V2.1Counts', # Enhanced counts with positions
'V1Themes', # Theme tags
'V2EnhancedThemes', # Themes with positions
'V1Locations', # Location mentions
'V2EnhancedLocations', # Locations with positions
'V1Persons', # Person names
'V2EnhancedPersons', # Persons with positions
'V1Organizations', # Organization names
'V2EnhancedOrganizations', # Organizations with positions
'V1.5Tone', # Emotional dimensions
'V2.1EnhancedDates', # Date mentions
'V2GCAM', # Global Content Analysis Measures
'V2.1SharingImage', # Publisher selected image
'V2.1RelatedImages', # Article images
'V2.1SocialImageEmbeds', # Social media images
'V2.1SocialVideoEmbeds', # Social media videos
'V2.1Quotations', # Quote extractions
'V2.1AllNames', # Named entities
'V2.1Amounts', # Numeric amounts
'V2.1TranslationInfo', # Translation metadata
'V2ExtrasXML' # Additional XML data
]
# Priority Columns
PRIORITY_COLUMNS = [
'GKGRECORDID', # Unique identifier
'DATE', # Publication date
'SourceCollectionIdentifier', # Source type
'SourceCommonName', # Source name
'DocumentIdentifier', # Document URL/ID
'V1Counts', # Numeric mentions
'V2.1Counts', # Enhanced counts
'V1Themes', # Theme tags
'V2EnhancedThemes', # Enhanced themes
'V1Locations', # Geographic data
'V2EnhancedLocations', # Enhanced locations
'V1Persons', # Person mentions
'V2EnhancedPersons', # Enhanced persons
'V1Organizations', # Organization mentions
'V2EnhancedOrganizations', # Enhanced organizations
'V1.5Tone', # Sentiment scores
'V2.1EnhancedDates', # Date mentions
'V2GCAM', # Global Content Analysis Measures
'V2.1SharingImage', # Publisher selected image
'V2.1Quotations', # Direct quotes
'V2.1AllNames', # All named entities
'V2.1Amounts' # Numeric data
]
# --- Tasks ---
@task(retries=3, retry_delay_seconds=30, log_prints=True)
def setup_directories(base_path: Path) -> dict:
"""Create processing directories."""
logger = get_run_logger()
try:
raw_dir = base_path / "gdelt_raw"
processed_dir = base_path / "gdelt_processed"
raw_dir.mkdir(parents=True, exist_ok=True)
processed_dir.mkdir(parents=True, exist_ok=True)
logger.info("Directories created successfully")
return {"raw": raw_dir, "processed": processed_dir}
except Exception as e:
logger.error(f"Directory creation failed: {str(e)}")
raise
@task(retries=2, log_prints=True)
def generate_gdelt_urls(start_date: datetime.datetime, end_date: datetime.datetime) -> Dict[datetime.date, List[str]]:
"""
Generate a dictionary keyed by date. Each value is a list of URLs (one per 15-minute interval).
"""
logger = get_run_logger()
url_groups = {}
try:
current_date = start_date.date()
while current_date <= end_date.date():
urls = [
f"{BASE_URL}/{current_date.strftime('%Y%m%d')}{hour:02}{minute:02}00.gkg.csv.zip"
for hour in range(24)
for minute in [0, 15, 30, 45]
]
url_groups[current_date] = urls
current_date += timedelta(days=1)
logger.info(f"Generated URL groups for dates: {list(url_groups.keys())}")
return url_groups
except Exception as e:
logger.error(f"URL generation failed: {str(e)}")
raise
@task(retries=3, retry_delay_seconds=30, log_prints=True)
def download_file(url: str, raw_dir: Path) -> Path:
"""Download a single CSV (zip) file from the given URL."""
logger = get_run_logger()
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
filename = Path(url).name
zip_path = raw_dir / filename
with zip_path.open('wb') as f:
f.write(response.content)
logger.info(f"Downloaded {filename}")
# Optionally, extract the CSV from the ZIP archive.
with zipfile.ZipFile(zip_path, 'r') as z:
# Assuming the zip contains one CSV file.
csv_names = z.namelist()
if csv_names:
extracted_csv = raw_dir / csv_names[0]
z.extractall(path=raw_dir)
logger.info(f"Extracted {csv_names[0]}")
return extracted_csv
else:
raise ValueError("Zip file is empty.")
except Exception as e:
logger.error(f"Error downloading {url}: {str(e)}")
raise
@task(retries=2, log_prints=True)
def convert_and_filter_combined(csv_paths: List[Path], processed_dir: Path, date: datetime.date) -> Path:
"""
Combine multiple CSV files (for one day) into a single DataFrame,
filter to only the required columns, optimize data types,
and write out as a single Parquet file.
"""
logger = get_run_logger()
try:
dfs = []
for csv_path in csv_paths:
df = pd.read_csv(
csv_path,
sep='\t',
names=GKG_COLUMNS,
dtype='string',
quoting=3,
na_values=[''],
encoding='utf-8',
encoding_errors='replace'
)
dfs.append(df)
combined_df = pd.concat(dfs, ignore_index=True)
filtered_df = combined_df[PRIORITY_COLUMNS].copy()
# Convert the date field to datetime; adjust the format if necessary.
if 'V2.1DATE' in filtered_df.columns:
filtered_df['V2.1DATE'] = pd.to_datetime(
filtered_df['V2.1DATE'], format='%Y%m%d%H%M%S', errors='coerce'
)
output_filename = f"gdelt_gkg_{date.strftime('%Y%m%d')}.parquet"
output_path = processed_dir / output_filename
filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False)
logger.info(f"Converted and filtered data for {date} into {output_filename}")
return output_path
except Exception as e:
logger.error(f"Error processing CSVs for {date}: {str(e)}")
raise
@task(retries=3, retry_delay_seconds=30, log_prints=True)
def upload_to_hf(file_path: Path, token: str) -> bool:
"""Upload task with global concurrency limit."""
logger = get_run_logger()
try:
with concurrency("hf_uploads", occupy=1):
# Enable the optimized HF Transfer backend.
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
api = HfApi()
api.upload_file(
path_or_fileobj=str(file_path),
path_in_repo=file_path.name,
repo_id=REPO_ID,
repo_type="dataset",
token=token,
)
logger.info(f"Uploaded {file_path.name}")
return True
except Exception as e:
logger.error(f"Upload failed for {file_path.name}: {str(e)}")
raise
@task(retries=3, retry_delay_seconds=120, log_prints=True)
def create_hf_repo(token: str) -> bool:
"""
Validate that the Hugging Face dataset repository exists; create it if not.
"""
logger = get_run_logger()
try:
api = HfApi()
datasets = [ds.id for ds in list_datasets(token=token)]
if REPO_ID in datasets:
logger.info(f"Dataset repository '{REPO_ID}' already exists.")
return True
# Create the repository if it doesn't exist.
api.create_repo(repo_id=REPO_ID, repo_type="dataset", token=token, private=False)
logger.info(f"Successfully created dataset repository: {REPO_ID}")
return True
except Exception as e:
logger.error(f"Failed to create or validate dataset repo '{REPO_ID}': {str(e)}")
raise RuntimeError(f"Repository validation/creation failed for '{REPO_ID}'") from e
@flow(name="Process Single Day", log_prints=True)
def process_single_day(
date: datetime.date, urls: List[str], directories: dict, hf_token: str
) -> bool:
"""
Process one day's data by:
1. Downloading all CSV files concurrently.
2. Merging, filtering, and optimizing the CSVs.
3. Writing out a single daily Parquet file.
4. Uploading the file to the Hugging Face Hub.
"""
logger = get_run_logger()
try:
# Download and process data (unlimited concurrency)
csv_paths = [download_file(url, directories["raw"]) for url in urls]
daily_parquet = convert_and_filter_combined(csv_paths, directories["processed"], date)
# Upload with global concurrency limit
upload_to_hf(daily_parquet, hf_token) # <-- Throttled to 2 concurrent
logger.info(f"Completed {date}")
return True
except Exception as e:
logger.error(f"Day {date} failed: {str(e)}")
raise
@flow(
name="Process Date Range",
task_runner=ConcurrentTaskRunner(), # Parallel subflows
log_prints=True
)
def process_date_range(base_path: Path = Path("data")):
"""
Main ETL flow:
1. Load parameters and credentials.
2. Validate (or create) the Hugging Face repository.
3. Setup directories.
4. Generate URL groups by date.
5. Process each day concurrently.
"""
logger = get_run_logger()
# Load parameters from a JSON block.
json_block = JSON.load("jdelt-etl-param-2025")
params = json_block.value
start_date = datetime.datetime.fromisoformat(params.get("start_date", "2025-02-03T00:00:00"))
end_date = datetime.datetime.fromisoformat(params.get("end_date", "2025-02-10T00:00:00"))
# Load the Hugging Face token from a Secret block.
secret_block = Secret.load("huggingface-token")
hf_token = secret_block.get()
# Validate or create the repository.
create_hf_repo(hf_token)
directories = setup_directories(base_path)
url_groups = generate_gdelt_urls(start_date, end_date)
# Process days concurrently (subflows)
futures = [process_single_day(date, urls, directories, hf_token)
for date, urls in url_groups.items()]
# Wait for completion (optional error handling)
for future in futures:
try:
future.result()
except Exception as e:
logger.error(f"Failed day: {str(e)}")
# --- Entry Point ---
if __name__ == "__main__":
process_date_range.serve(
name="gdelt-etl-2025-v1",
tags=["gdelt", "etl", "production", "2025"],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment