Last active
February 21, 2025 06:18
-
-
Save donbr/704789a6131bb4a92c9810185c63a16a to your computer and use it in GitHub Desktop.
prefect-colab-event-mentions.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""prefect-colab-event-mentions.ipynb | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/1KZRhjRazTGl7tjyG91Y9uJNDyxxcI2CY | |
""" | |
# Commented out IPython magic to ensure Python compatibility. | |
# %pip install -q huggingface_hub[hf_transfer] datasets prefect | |
import os | |
import json | |
from google.colab import userdata | |
hf_token = userdata.get('HF_TOKEN') | |
if hf_token is None: | |
raise ValueError("Missing HF_TOKEN secret.") | |
os.environ['HF_TOKEN'] = hf_token | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
import os | |
from google.colab import userdata | |
# Option 1: Set directly as environment variables | |
os.environ["PREFECT_API_KEY"] = userdata.get('PREFECT_API_KEY') | |
os.environ["PREFECT_API_URL"] = userdata.get('PREFECT_API_URL') | |
!prefect cloud login -k $PREFECT_API_KEY | |
!prefect profile ls | |
!prefect config view | |
import nest_asyncio | |
nest_asyncio.apply() | |
import os | |
import asyncio | |
from prefect import flow, task, get_run_logger | |
from prefect.tasks import task_input_hash | |
from prefect.blocks.system import Secret, JSON | |
from prefect.task_runners import ConcurrentTaskRunner | |
from prefect.concurrency.sync import concurrency | |
from pathlib import Path | |
import datetime | |
from datetime import timedelta | |
import pandas as pd | |
from tqdm import tqdm | |
from huggingface_hub import HfApi, hf_hub_url, list_datasets | |
import requests | |
import zipfile | |
from typing import List, Dict, Optional | |
# --- Constants --- | |
# Repository IDs for Hugging Face datasets | |
EVENT_REPO_ID = "dwb2023/gdelt-event-2025-v2" | |
MENTIONS_REPO_ID = "dwb2023/gdelt-mentions-2025-v2" | |
BASE_URL = "http://data.gdeltproject.org/gdeltv2" | |
# Column definitions for the Events table | |
EVENT_COLUMNS = [ | |
'GlobalEventID', # Unique identifier | |
'Day', # Date in YYYYMMDD format | |
'MonthYear', # Date in YYYYMM format | |
'Year', # Year | |
'FractionDate', # Date as YYYY.FFFF | |
'Actor1Code', # Source actor code | |
'Actor1Name', # Source actor name | |
'Actor1CountryCode', # Source actor country | |
'Actor1KnownGroupCode', # Source actor known group | |
'Actor1EthnicCode', # Source actor ethnic code | |
'Actor1Religion1Code', # Source actor religion1 code | |
'Actor1Religion2Code', # Source actor religion2 code | |
'Actor1Type1Code', # Source actor type1 code | |
'Actor1Type2Code', # Source actor type2 code | |
'Actor1Type3Code', # Source actor type3 code | |
'Actor2Code', # Target actor code | |
'Actor2Name', # Target actor name | |
'Actor2CountryCode', # Target actor country | |
'Actor2KnownGroupCode', # Target actor known group | |
'Actor2EthnicCode', # Target actor ethnic code | |
'Actor2Religion1Code', # Target actor religion1 code | |
'Actor2Religion2Code', # Target actor religion2 code | |
'Actor2Type1Code', # Target actor type1 code | |
'Actor2Type2Code', # Target actor type2 code | |
'Actor2Type3Code', # Target actor type3 code | |
'IsRootEvent', # Is this a root event | |
'EventCode', # CAMEO event code | |
'EventBaseCode', # CAMEO event base code | |
'EventRootCode', # CAMEO event root code | |
'QuadClass', # Quad class (1-4) | |
'GoldsteinScale', # Goldstein scale (-10 to +10) | |
'NumMentions', # Number of mentions | |
'NumSources', # Number of sources | |
'NumArticles', # Number of articles | |
'AvgTone', # Average tone | |
'Actor1Geo_Type', # Actor1 geo type | |
'Actor1Geo_Fullname', # Actor1 geo full name | |
'Actor1Geo_CountryCode', # Actor1 geo country code | |
'Actor1Geo_ADM1Code', # Actor1 geo adm1 code | |
'Actor1Geo_ADM2Code', # Actor1 geo adm2 code | |
'Actor1Geo_Lat', # Actor1 geo latitude | |
'Actor1Geo_Long', # Actor1 geo longitude | |
'Actor1Geo_FeatureID', # Actor1 geo feature ID | |
'Actor2Geo_Type', # Actor2 geo type | |
'Actor2Geo_Fullname', # Actor2 geo full name | |
'Actor2Geo_CountryCode', # Actor2 geo country code | |
'Actor2Geo_ADM1Code', # Actor2 geo adm1 code | |
'Actor2Geo_ADM2Code', # Actor2 geo adm2 code | |
'Actor2Geo_Lat', # Actor2 geo latitude | |
'Actor2Geo_Long', # Actor2 geo longitude | |
'Actor2Geo_FeatureID', # Actor2 geo feature ID | |
'ActionGeo_Type', # Action geo type | |
'ActionGeo_Fullname', # Action geo full name | |
'ActionGeo_CountryCode', # Action geo country code | |
'ActionGeo_ADM1Code', # Action geo adm1 code | |
'ActionGeo_ADM2Code', # Action geo adm2 code | |
'ActionGeo_Lat', # Action geo latitude | |
'ActionGeo_Long', # Action geo longitude | |
'ActionGeo_FeatureID', # Action geo feature ID | |
'DATEADDED', # Date added (YYYYMMDDHHMMSS) | |
'SOURCEURL' # Source URL | |
] | |
# Column definitions for the Mentions table | |
MENTIONS_COLUMNS = [ | |
'GlobalEventID', # Links to the Events table | |
'EventTimeDate', # When the event took place | |
'MentionTimeDate', # When the event was mentioned | |
'MentionType', # Type of mention source | |
'MentionSourceName', # Name of the source | |
'MentionIdentifier', # URL or identifier of the source | |
'SentenceID', # Sentence number within the document | |
'Actor1CharOffset', # Character offset of Actor1 | |
'Actor2CharOffset', # Character offset of Actor2 | |
'ActionCharOffset', # Character offset of Action | |
'InRawText', # Was this found in the raw text | |
'Confidence', # Confidence score (0-100) | |
'MentionDocLen', # Document length in chars | |
'MentionDocTone', # Document tone | |
'MentionDocTranslationInfo', # Translation information | |
'Extras' # Extra information | |
] | |
# Priority columns for Events | |
EVENT_PRIORITY_COLUMNS = [ | |
'GlobalEventID', # Unique identifier | |
'Day', # Date in YYYYMMDD format | |
'MonthYear', # Date in YYYYMM format | |
'Year', # Year | |
'FractionDate', # Date as YYYY.FFFF | |
'Actor1Code', # Source actor code | |
'Actor1Name', # Source actor name | |
'Actor1CountryCode', # Source actor country | |
'Actor1KnownGroupCode', # Source actor known group | |
'Actor1EthnicCode', # Source actor ethnic code | |
'Actor1Religion1Code', # Source actor religion1 code | |
'Actor1Religion2Code', # Source actor religion2 code | |
'Actor1Type1Code', # Source actor type1 code | |
'Actor1Type2Code', # Source actor type2 code | |
'Actor1Type3Code', # Source actor type3 code | |
'Actor2Code', # Target actor code | |
'Actor2Name', # Target actor name | |
'Actor2CountryCode', # Target actor country | |
'Actor2KnownGroupCode', # Target actor known group | |
'Actor2EthnicCode', # Target actor ethnic code | |
'Actor2Religion1Code', # Target actor religion1 code | |
'Actor2Religion2Code', # Target actor religion2 code | |
'Actor2Type1Code', # Target actor type1 code | |
'Actor2Type2Code', # Target actor type2 code | |
'Actor2Type3Code', # Target actor type3 code | |
'IsRootEvent', # Is this a root event | |
'EventCode', # CAMEO event code | |
'EventBaseCode', # CAMEO event base code | |
'EventRootCode', # CAMEO event root code | |
'QuadClass', # Quad class (1-4) | |
'GoldsteinScale', # Goldstein scale (-10 to +10) | |
'NumMentions', # Number of mentions | |
'NumSources', # Number of sources | |
'NumArticles', # Number of articles | |
'AvgTone', # Average tone | |
'Actor1Geo_Type', # Actor1 geo type | |
'Actor1Geo_Fullname', # Actor1 geo full name | |
'Actor1Geo_CountryCode', # Actor1 geo country code | |
'Actor1Geo_ADM1Code', # Actor1 geo adm1 code | |
'Actor1Geo_ADM2Code', # Actor1 geo adm2 code | |
'Actor1Geo_Lat', # Actor1 geo latitude | |
'Actor1Geo_Long', # Actor1 geo longitude | |
'Actor1Geo_FeatureID', # Actor1 geo feature ID | |
'Actor2Geo_Type', # Actor2 geo type | |
'Actor2Geo_Fullname', # Actor2 geo full name | |
'Actor2Geo_CountryCode', # Actor2 geo country code | |
'Actor2Geo_ADM1Code', # Actor2 geo adm1 code | |
'Actor2Geo_ADM2Code', # Actor2 geo adm2 code | |
'Actor2Geo_Lat', # Actor2 geo latitude | |
'Actor2Geo_Long', # Actor2 geo longitude | |
'Actor2Geo_FeatureID', # Actor2 geo feature ID | |
'ActionGeo_Type', # Action geo type | |
'ActionGeo_Fullname', # Action geo full name | |
'ActionGeo_CountryCode', # Action geo country code | |
'ActionGeo_ADM1Code', # Action geo adm1 code | |
'ActionGeo_ADM2Code', # Action geo adm2 code | |
'ActionGeo_Lat', # Action geo latitude | |
'ActionGeo_Long', # Action geo longitude | |
'ActionGeo_FeatureID', # Action geo feature ID | |
'DATEADDED', # Date added (YYYYMMDDHHMMSS) | |
'SOURCEURL' # Source URL | |
] | |
# Priority columns for Mentions | |
MENTIONS_PRIORITY_COLUMNS = [ | |
'GlobalEventID', # Links to the Events table | |
'EventTimeDate', # When the event took place | |
'MentionTimeDate', # When the event was mentioned | |
'MentionType', # Type of mention source | |
'MentionSourceName', # Name of the source | |
'MentionIdentifier', # URL or identifier of the source | |
'SentenceID', # Sentence number within the document | |
'Actor1CharOffset', # Character offset of Actor1 | |
'Actor2CharOffset', # Character offset of Actor2 | |
'ActionCharOffset', # Character offset of Action | |
'InRawText', # Was this found in the raw text | |
'Confidence', # Confidence score (0-100) | |
'MentionDocLen', # Document length in chars | |
'MentionDocTone', # Document tone | |
'MentionDocTranslationInfo', # Translation information | |
'Extras' # Extra information | |
] | |
# --- Tasks --- | |
@task(retries=3, retry_delay_seconds=30, log_prints=True) | |
def setup_directories(base_path: Path) -> dict: | |
"""Create processing directories.""" | |
logger = get_run_logger() | |
try: | |
raw_dir = base_path / "gdelt_raw" | |
events_dir = base_path / "gdelt_events" | |
mentions_dir = base_path / "gdelt_mentions" | |
raw_dir.mkdir(parents=True, exist_ok=True) | |
events_dir.mkdir(parents=True, exist_ok=True) | |
mentions_dir.mkdir(parents=True, exist_ok=True) | |
logger.info("Directories created successfully") | |
return {"raw": raw_dir, "events": events_dir, "mentions": mentions_dir} | |
except Exception as e: | |
logger.error(f"Directory creation failed: {str(e)}") | |
raise | |
@task(retries=2, log_prints=True) | |
def generate_gdelt_urls(start_date: datetime.datetime, end_date: datetime.datetime) -> Dict[datetime.date, List[Dict[str, str]]]: | |
""" | |
Generate a dictionary keyed by date. Each value is a list of URL dictionaries for events and mentions. | |
""" | |
logger = get_run_logger() | |
url_groups = {} | |
try: | |
current_date = start_date.date() | |
while current_date <= end_date.date(): | |
daily_urls = [] | |
for hour in range(24): | |
for minute in [0, 15, 30, 45]: | |
timestamp = f"{current_date.strftime('%Y%m%d')}{hour:02}{minute:02}00" | |
daily_urls.append({ | |
"events": f"{BASE_URL}/{timestamp}.export.CSV.zip", | |
"mentions": f"{BASE_URL}/{timestamp}.mentions.CSV.zip" | |
}) | |
url_groups[current_date] = daily_urls | |
current_date += timedelta(days=1) | |
logger.info(f"Generated URL groups for dates: {list(url_groups.keys())}") | |
return url_groups | |
except Exception as e: | |
logger.error(f"URL generation failed: {str(e)}") | |
raise | |
@task(retries=3, retry_delay_seconds=30, log_prints=True) | |
def download_file(url: str, raw_dir: Path) -> Path: | |
"""Download a single CSV (zip) file from the given URL.""" | |
logger = get_run_logger() | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
filename = Path(url).name | |
zip_path = raw_dir / filename | |
with zip_path.open('wb') as f: | |
f.write(response.content) | |
logger.info(f"Downloaded {filename}") | |
# Extract the CSV from the ZIP archive | |
with zipfile.ZipFile(zip_path, 'r') as z: | |
csv_names = z.namelist() | |
if csv_names: | |
extracted_csv = raw_dir / csv_names[0] | |
z.extractall(path=raw_dir) | |
logger.info(f"Extracted {csv_names[0]}") | |
return extracted_csv | |
else: | |
raise ValueError("Zip file is empty.") | |
except Exception as e: | |
logger.error(f"Error downloading {url}: {str(e)}") | |
raise | |
@task(retries=2, log_prints=True) | |
def process_events_files(csv_paths: List[Path], output_dir: Path, date: datetime.date) -> Path: | |
""" | |
Combine multiple event CSV files (for one day) into a single DataFrame, | |
filter to only the required columns, optimize data types, | |
and write out as a single Parquet file. | |
""" | |
logger = get_run_logger() | |
try: | |
dfs = [] | |
for csv_path in csv_paths: | |
if csv_path and csv_path.exists(): | |
df = pd.read_csv( | |
csv_path, | |
sep='\t', | |
names=EVENT_COLUMNS, | |
dtype='string', | |
quoting=3, | |
na_values=[''], | |
encoding='utf-8', | |
encoding_errors='replace' | |
) | |
dfs.append(df) | |
else: | |
logger.warning(f"CSV path {csv_path} does not exist, skipping") | |
if not dfs: | |
logger.warning(f"No valid event CSV files found for {date}") | |
return None | |
combined_df = pd.concat(dfs, ignore_index=True) | |
filtered_df = combined_df[EVENT_PRIORITY_COLUMNS].copy() | |
# Convert numeric columns | |
numeric_columns = ['GoldsteinScale', 'NumMentions', 'NumSources', 'NumArticles', 'AvgTone', | |
'ActionGeo_Lat', 'ActionGeo_Long'] | |
for col in numeric_columns: | |
if col in filtered_df.columns: | |
filtered_df[col] = pd.to_numeric(filtered_df[col], errors='coerce') | |
# Convert date columns | |
if 'Day' in filtered_df.columns: | |
filtered_df['Day'] = pd.to_numeric(filtered_df['Day'], errors='coerce') | |
if 'DATEADDED' in filtered_df.columns: | |
filtered_df['DATEADDED'] = pd.to_numeric(filtered_df['DATEADDED'], errors='coerce') | |
# Convert categories | |
category_columns = ['EventCode', 'EventBaseCode', 'EventRootCode', 'QuadClass', | |
'Actor1CountryCode', 'Actor2CountryCode', 'ActionGeo_CountryCode'] | |
for col in category_columns: | |
if col in filtered_df.columns: | |
filtered_df[col] = filtered_df[col].astype('category') | |
output_filename = f"gdelt_events_{date.strftime('%Y%m%d')}.parquet" | |
output_path = output_dir / output_filename | |
filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False) | |
logger.info(f"Processed {len(filtered_df)} event records for {date}") | |
return output_path | |
except Exception as e: | |
logger.error(f"Error processing event CSVs for {date}: {str(e)}") | |
raise | |
@task(retries=2, log_prints=True) | |
def process_mentions_files(csv_paths: List[Path], output_dir: Path, date: datetime.date) -> Path: | |
""" | |
Combine multiple mentions CSV files (for one day) into a single DataFrame, | |
filter to only the required columns, optimize data types, | |
and write out as a single Parquet file. | |
""" | |
logger = get_run_logger() | |
try: | |
dfs = [] | |
for csv_path in csv_paths: | |
if csv_path and csv_path.exists(): | |
df = pd.read_csv( | |
csv_path, | |
sep='\t', | |
names=MENTIONS_COLUMNS, | |
dtype='string', | |
quoting=3, | |
na_values=[''], | |
encoding='utf-8', | |
encoding_errors='replace' | |
) | |
dfs.append(df) | |
else: | |
logger.warning(f"CSV path {csv_path} does not exist, skipping") | |
if not dfs: | |
logger.warning(f"No valid mentions CSV files found for {date}") | |
return None | |
combined_df = pd.concat(dfs, ignore_index=True) | |
filtered_df = combined_df[MENTIONS_PRIORITY_COLUMNS].copy() | |
# Convert numeric columns | |
numeric_columns = ['Confidence', 'MentionDocLen', 'MentionDocTone'] | |
for col in numeric_columns: | |
if col in filtered_df.columns: | |
filtered_df[col] = pd.to_numeric(filtered_df[col], errors='coerce') | |
# Convert date columns | |
date_columns = ['EventTimeDate', 'MentionTimeDate'] | |
for col in date_columns: | |
if col in filtered_df.columns: | |
filtered_df[col] = pd.to_numeric(filtered_df[col], errors='coerce') | |
# Convert categories | |
category_columns = ['MentionType', 'MentionSourceName'] | |
for col in category_columns: | |
if col in filtered_df.columns: | |
filtered_df[col] = filtered_df[col].astype('category') | |
output_filename = f"gdelt_mentions_{date.strftime('%Y%m%d')}.parquet" | |
output_path = output_dir / output_filename | |
filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False) | |
logger.info(f"Processed {len(filtered_df)} mention records for {date}") | |
return output_path | |
except Exception as e: | |
logger.error(f"Error processing mentions CSVs for {date}: {str(e)}") | |
raise | |
@task(retries=3, retry_delay_seconds=30, log_prints=True) | |
def upload_to_hf(file_path: Path, repo_id: str, token: str) -> bool: | |
"""Upload task with global concurrency limit.""" | |
logger = get_run_logger() | |
if not file_path or not file_path.exists(): | |
logger.warning(f"File path {file_path} does not exist, skipping upload") | |
return False | |
try: | |
with concurrency("hf_uploads", occupy=1): | |
# Enable the optimized HF Transfer backend | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
api = HfApi() | |
api.upload_file( | |
path_or_fileobj=str(file_path), | |
path_in_repo=file_path.name, | |
repo_id=repo_id, | |
repo_type="dataset", | |
token=token, | |
) | |
logger.info(f"Uploaded {file_path.name} to {repo_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Upload failed for {file_path.name}: {str(e)}") | |
raise | |
@task(retries=3, retry_delay_seconds=120, log_prints=True) | |
def create_hf_repo(repo_id: str, token: str) -> bool: | |
""" | |
Validate that the Hugging Face dataset repository exists; create it if not. | |
""" | |
logger = get_run_logger() | |
try: | |
api = HfApi() | |
datasets = [ds.id for ds in list_datasets(token=token)] | |
if repo_id in datasets: | |
logger.info(f"Dataset repository '{repo_id}' already exists.") | |
return True | |
# Create the repository if it doesn't exist | |
api.create_repo(repo_id=repo_id, repo_type="dataset", token=token, private=False) | |
logger.info(f"Successfully created dataset repository: {repo_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to create or validate dataset repo '{repo_id}': {str(e)}") | |
raise RuntimeError(f"Repository validation/creation failed for '{repo_id}'") from e | |
@flow(name="Process Single Day Events and Mentions", log_prints=True) | |
def process_single_day( | |
date: datetime.date, url_groups: List[Dict[str, str]], directories: dict, hf_token: str | |
) -> bool: | |
""" | |
Process one day's events and mentions data by: | |
1. Downloading all CSV files concurrently | |
2. Processing events and mentions files separately | |
3. Writing out Parquet files | |
4. Uploading the files to Hugging Face Hub | |
""" | |
logger = get_run_logger() | |
try: | |
# Download files | |
event_csvs = [] | |
mentions_csvs = [] | |
for url_group in url_groups: | |
event_csv = download_file(url_group["events"], directories["raw"]) | |
mentions_csv = download_file(url_group["mentions"], directories["raw"]) | |
if event_csv: | |
event_csvs.append(event_csv) | |
if mentions_csv: | |
mentions_csvs.append(mentions_csv) | |
# Process events files | |
events_parquet = process_events_files(event_csvs, directories["events"], date) | |
# Process mentions files | |
mentions_parquet = process_mentions_files(mentions_csvs, directories["mentions"], date) | |
# Upload to Hugging Face | |
if events_parquet: | |
upload_to_hf(events_parquet, EVENT_REPO_ID, hf_token) | |
if mentions_parquet: | |
upload_to_hf(mentions_parquet, MENTIONS_REPO_ID, hf_token) | |
logger.info(f"Completed processing for {date}") | |
return True | |
except Exception as e: | |
logger.error(f"Day {date} failed: {str(e)}") | |
raise | |
@flow( | |
name="Process GDELT Events and Mentions", | |
task_runner=ConcurrentTaskRunner(), # Parallel subflows | |
log_prints=True | |
) | |
def process_gdelt_events_mentions(base_path: Path = Path("data")): | |
""" | |
Main ETL flow: | |
1. Load parameters and credentials | |
2. Validate (or create) the Hugging Face repositories | |
3. Setup directories | |
4. Generate URL groups by date | |
5. Process each day concurrently | |
""" | |
logger = get_run_logger() | |
# Load parameters from a JSON block | |
json_block = JSON.load("gdelt-etl-param-2025") | |
params = json_block.value | |
start_date = datetime.datetime.fromisoformat(params.get("start_date", "2025-02-18T00:00:00")) | |
end_date = datetime.datetime.fromisoformat(params.get("end_date", "2025-02-20T00:00:00")) | |
# Load the Hugging Face token from a Secret block | |
secret_block = Secret.load("huggingface-token") | |
hf_token = secret_block.get() | |
# Validate or create the repositories | |
create_hf_repo(EVENT_REPO_ID, hf_token) | |
create_hf_repo(MENTIONS_REPO_ID, hf_token) | |
directories = setup_directories(base_path) | |
url_groups = generate_gdelt_urls(start_date, end_date) | |
# Process days concurrently (subflows) | |
futures = [ | |
process_single_day(date, urls, directories, hf_token) | |
for date, urls in url_groups.items() | |
] | |
# Wait for completion (optional error handling) | |
for future in futures: | |
try: | |
future.result() | |
except Exception as e: | |
logger.error(f"Failed day: {str(e)}") | |
# --- Entry Point --- | |
if __name__ == "__main__": | |
process_gdelt_events_mentions() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment