Skip to content

Instantly share code, notes, and snippets.

@donbr
Last active February 21, 2025 06:18
Show Gist options
  • Save donbr/704789a6131bb4a92c9810185c63a16a to your computer and use it in GitHub Desktop.
Save donbr/704789a6131bb4a92c9810185c63a16a to your computer and use it in GitHub Desktop.
prefect-colab-event-mentions.py
# -*- coding: utf-8 -*-
"""prefect-colab-event-mentions.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1KZRhjRazTGl7tjyG91Y9uJNDyxxcI2CY
"""
# Commented out IPython magic to ensure Python compatibility.
# %pip install -q huggingface_hub[hf_transfer] datasets prefect
import os
import json
from google.colab import userdata
hf_token = userdata.get('HF_TOKEN')
if hf_token is None:
raise ValueError("Missing HF_TOKEN secret.")
os.environ['HF_TOKEN'] = hf_token
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
import os
from google.colab import userdata
# Option 1: Set directly as environment variables
os.environ["PREFECT_API_KEY"] = userdata.get('PREFECT_API_KEY')
os.environ["PREFECT_API_URL"] = userdata.get('PREFECT_API_URL')
!prefect cloud login -k $PREFECT_API_KEY
!prefect profile ls
!prefect config view
import nest_asyncio
nest_asyncio.apply()
import os
import asyncio
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret, JSON
from prefect.task_runners import ConcurrentTaskRunner
from prefect.concurrency.sync import concurrency
from pathlib import Path
import datetime
from datetime import timedelta
import pandas as pd
from tqdm import tqdm
from huggingface_hub import HfApi, hf_hub_url, list_datasets
import requests
import zipfile
from typing import List, Dict, Optional
# --- Constants ---
# Repository IDs for Hugging Face datasets
EVENT_REPO_ID = "dwb2023/gdelt-event-2025-v2"
MENTIONS_REPO_ID = "dwb2023/gdelt-mentions-2025-v2"
BASE_URL = "http://data.gdeltproject.org/gdeltv2"
# Column definitions for the Events table
EVENT_COLUMNS = [
'GlobalEventID', # Unique identifier
'Day', # Date in YYYYMMDD format
'MonthYear', # Date in YYYYMM format
'Year', # Year
'FractionDate', # Date as YYYY.FFFF
'Actor1Code', # Source actor code
'Actor1Name', # Source actor name
'Actor1CountryCode', # Source actor country
'Actor1KnownGroupCode', # Source actor known group
'Actor1EthnicCode', # Source actor ethnic code
'Actor1Religion1Code', # Source actor religion1 code
'Actor1Religion2Code', # Source actor religion2 code
'Actor1Type1Code', # Source actor type1 code
'Actor1Type2Code', # Source actor type2 code
'Actor1Type3Code', # Source actor type3 code
'Actor2Code', # Target actor code
'Actor2Name', # Target actor name
'Actor2CountryCode', # Target actor country
'Actor2KnownGroupCode', # Target actor known group
'Actor2EthnicCode', # Target actor ethnic code
'Actor2Religion1Code', # Target actor religion1 code
'Actor2Religion2Code', # Target actor religion2 code
'Actor2Type1Code', # Target actor type1 code
'Actor2Type2Code', # Target actor type2 code
'Actor2Type3Code', # Target actor type3 code
'IsRootEvent', # Is this a root event
'EventCode', # CAMEO event code
'EventBaseCode', # CAMEO event base code
'EventRootCode', # CAMEO event root code
'QuadClass', # Quad class (1-4)
'GoldsteinScale', # Goldstein scale (-10 to +10)
'NumMentions', # Number of mentions
'NumSources', # Number of sources
'NumArticles', # Number of articles
'AvgTone', # Average tone
'Actor1Geo_Type', # Actor1 geo type
'Actor1Geo_Fullname', # Actor1 geo full name
'Actor1Geo_CountryCode', # Actor1 geo country code
'Actor1Geo_ADM1Code', # Actor1 geo adm1 code
'Actor1Geo_ADM2Code', # Actor1 geo adm2 code
'Actor1Geo_Lat', # Actor1 geo latitude
'Actor1Geo_Long', # Actor1 geo longitude
'Actor1Geo_FeatureID', # Actor1 geo feature ID
'Actor2Geo_Type', # Actor2 geo type
'Actor2Geo_Fullname', # Actor2 geo full name
'Actor2Geo_CountryCode', # Actor2 geo country code
'Actor2Geo_ADM1Code', # Actor2 geo adm1 code
'Actor2Geo_ADM2Code', # Actor2 geo adm2 code
'Actor2Geo_Lat', # Actor2 geo latitude
'Actor2Geo_Long', # Actor2 geo longitude
'Actor2Geo_FeatureID', # Actor2 geo feature ID
'ActionGeo_Type', # Action geo type
'ActionGeo_Fullname', # Action geo full name
'ActionGeo_CountryCode', # Action geo country code
'ActionGeo_ADM1Code', # Action geo adm1 code
'ActionGeo_ADM2Code', # Action geo adm2 code
'ActionGeo_Lat', # Action geo latitude
'ActionGeo_Long', # Action geo longitude
'ActionGeo_FeatureID', # Action geo feature ID
'DATEADDED', # Date added (YYYYMMDDHHMMSS)
'SOURCEURL' # Source URL
]
# Column definitions for the Mentions table
MENTIONS_COLUMNS = [
'GlobalEventID', # Links to the Events table
'EventTimeDate', # When the event took place
'MentionTimeDate', # When the event was mentioned
'MentionType', # Type of mention source
'MentionSourceName', # Name of the source
'MentionIdentifier', # URL or identifier of the source
'SentenceID', # Sentence number within the document
'Actor1CharOffset', # Character offset of Actor1
'Actor2CharOffset', # Character offset of Actor2
'ActionCharOffset', # Character offset of Action
'InRawText', # Was this found in the raw text
'Confidence', # Confidence score (0-100)
'MentionDocLen', # Document length in chars
'MentionDocTone', # Document tone
'MentionDocTranslationInfo', # Translation information
'Extras' # Extra information
]
# Priority columns for Events
EVENT_PRIORITY_COLUMNS = [
'GlobalEventID', # Unique identifier
'Day', # Date in YYYYMMDD format
'MonthYear', # Date in YYYYMM format
'Year', # Year
'FractionDate', # Date as YYYY.FFFF
'Actor1Code', # Source actor code
'Actor1Name', # Source actor name
'Actor1CountryCode', # Source actor country
'Actor1KnownGroupCode', # Source actor known group
'Actor1EthnicCode', # Source actor ethnic code
'Actor1Religion1Code', # Source actor religion1 code
'Actor1Religion2Code', # Source actor religion2 code
'Actor1Type1Code', # Source actor type1 code
'Actor1Type2Code', # Source actor type2 code
'Actor1Type3Code', # Source actor type3 code
'Actor2Code', # Target actor code
'Actor2Name', # Target actor name
'Actor2CountryCode', # Target actor country
'Actor2KnownGroupCode', # Target actor known group
'Actor2EthnicCode', # Target actor ethnic code
'Actor2Religion1Code', # Target actor religion1 code
'Actor2Religion2Code', # Target actor religion2 code
'Actor2Type1Code', # Target actor type1 code
'Actor2Type2Code', # Target actor type2 code
'Actor2Type3Code', # Target actor type3 code
'IsRootEvent', # Is this a root event
'EventCode', # CAMEO event code
'EventBaseCode', # CAMEO event base code
'EventRootCode', # CAMEO event root code
'QuadClass', # Quad class (1-4)
'GoldsteinScale', # Goldstein scale (-10 to +10)
'NumMentions', # Number of mentions
'NumSources', # Number of sources
'NumArticles', # Number of articles
'AvgTone', # Average tone
'Actor1Geo_Type', # Actor1 geo type
'Actor1Geo_Fullname', # Actor1 geo full name
'Actor1Geo_CountryCode', # Actor1 geo country code
'Actor1Geo_ADM1Code', # Actor1 geo adm1 code
'Actor1Geo_ADM2Code', # Actor1 geo adm2 code
'Actor1Geo_Lat', # Actor1 geo latitude
'Actor1Geo_Long', # Actor1 geo longitude
'Actor1Geo_FeatureID', # Actor1 geo feature ID
'Actor2Geo_Type', # Actor2 geo type
'Actor2Geo_Fullname', # Actor2 geo full name
'Actor2Geo_CountryCode', # Actor2 geo country code
'Actor2Geo_ADM1Code', # Actor2 geo adm1 code
'Actor2Geo_ADM2Code', # Actor2 geo adm2 code
'Actor2Geo_Lat', # Actor2 geo latitude
'Actor2Geo_Long', # Actor2 geo longitude
'Actor2Geo_FeatureID', # Actor2 geo feature ID
'ActionGeo_Type', # Action geo type
'ActionGeo_Fullname', # Action geo full name
'ActionGeo_CountryCode', # Action geo country code
'ActionGeo_ADM1Code', # Action geo adm1 code
'ActionGeo_ADM2Code', # Action geo adm2 code
'ActionGeo_Lat', # Action geo latitude
'ActionGeo_Long', # Action geo longitude
'ActionGeo_FeatureID', # Action geo feature ID
'DATEADDED', # Date added (YYYYMMDDHHMMSS)
'SOURCEURL' # Source URL
]
# Priority columns for Mentions
MENTIONS_PRIORITY_COLUMNS = [
'GlobalEventID', # Links to the Events table
'EventTimeDate', # When the event took place
'MentionTimeDate', # When the event was mentioned
'MentionType', # Type of mention source
'MentionSourceName', # Name of the source
'MentionIdentifier', # URL or identifier of the source
'SentenceID', # Sentence number within the document
'Actor1CharOffset', # Character offset of Actor1
'Actor2CharOffset', # Character offset of Actor2
'ActionCharOffset', # Character offset of Action
'InRawText', # Was this found in the raw text
'Confidence', # Confidence score (0-100)
'MentionDocLen', # Document length in chars
'MentionDocTone', # Document tone
'MentionDocTranslationInfo', # Translation information
'Extras' # Extra information
]
# --- Tasks ---
@task(retries=3, retry_delay_seconds=30, log_prints=True)
def setup_directories(base_path: Path) -> dict:
"""Create processing directories."""
logger = get_run_logger()
try:
raw_dir = base_path / "gdelt_raw"
events_dir = base_path / "gdelt_events"
mentions_dir = base_path / "gdelt_mentions"
raw_dir.mkdir(parents=True, exist_ok=True)
events_dir.mkdir(parents=True, exist_ok=True)
mentions_dir.mkdir(parents=True, exist_ok=True)
logger.info("Directories created successfully")
return {"raw": raw_dir, "events": events_dir, "mentions": mentions_dir}
except Exception as e:
logger.error(f"Directory creation failed: {str(e)}")
raise
@task(retries=2, log_prints=True)
def generate_gdelt_urls(start_date: datetime.datetime, end_date: datetime.datetime) -> Dict[datetime.date, List[Dict[str, str]]]:
"""
Generate a dictionary keyed by date. Each value is a list of URL dictionaries for events and mentions.
"""
logger = get_run_logger()
url_groups = {}
try:
current_date = start_date.date()
while current_date <= end_date.date():
daily_urls = []
for hour in range(24):
for minute in [0, 15, 30, 45]:
timestamp = f"{current_date.strftime('%Y%m%d')}{hour:02}{minute:02}00"
daily_urls.append({
"events": f"{BASE_URL}/{timestamp}.export.CSV.zip",
"mentions": f"{BASE_URL}/{timestamp}.mentions.CSV.zip"
})
url_groups[current_date] = daily_urls
current_date += timedelta(days=1)
logger.info(f"Generated URL groups for dates: {list(url_groups.keys())}")
return url_groups
except Exception as e:
logger.error(f"URL generation failed: {str(e)}")
raise
@task(retries=3, retry_delay_seconds=30, log_prints=True)
def download_file(url: str, raw_dir: Path) -> Path:
"""Download a single CSV (zip) file from the given URL."""
logger = get_run_logger()
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
filename = Path(url).name
zip_path = raw_dir / filename
with zip_path.open('wb') as f:
f.write(response.content)
logger.info(f"Downloaded {filename}")
# Extract the CSV from the ZIP archive
with zipfile.ZipFile(zip_path, 'r') as z:
csv_names = z.namelist()
if csv_names:
extracted_csv = raw_dir / csv_names[0]
z.extractall(path=raw_dir)
logger.info(f"Extracted {csv_names[0]}")
return extracted_csv
else:
raise ValueError("Zip file is empty.")
except Exception as e:
logger.error(f"Error downloading {url}: {str(e)}")
raise
@task(retries=2, log_prints=True)
def process_events_files(csv_paths: List[Path], output_dir: Path, date: datetime.date) -> Path:
"""
Combine multiple event CSV files (for one day) into a single DataFrame,
filter to only the required columns, optimize data types,
and write out as a single Parquet file.
"""
logger = get_run_logger()
try:
dfs = []
for csv_path in csv_paths:
if csv_path and csv_path.exists():
df = pd.read_csv(
csv_path,
sep='\t',
names=EVENT_COLUMNS,
dtype='string',
quoting=3,
na_values=[''],
encoding='utf-8',
encoding_errors='replace'
)
dfs.append(df)
else:
logger.warning(f"CSV path {csv_path} does not exist, skipping")
if not dfs:
logger.warning(f"No valid event CSV files found for {date}")
return None
combined_df = pd.concat(dfs, ignore_index=True)
filtered_df = combined_df[EVENT_PRIORITY_COLUMNS].copy()
# Convert numeric columns
numeric_columns = ['GoldsteinScale', 'NumMentions', 'NumSources', 'NumArticles', 'AvgTone',
'ActionGeo_Lat', 'ActionGeo_Long']
for col in numeric_columns:
if col in filtered_df.columns:
filtered_df[col] = pd.to_numeric(filtered_df[col], errors='coerce')
# Convert date columns
if 'Day' in filtered_df.columns:
filtered_df['Day'] = pd.to_numeric(filtered_df['Day'], errors='coerce')
if 'DATEADDED' in filtered_df.columns:
filtered_df['DATEADDED'] = pd.to_numeric(filtered_df['DATEADDED'], errors='coerce')
# Convert categories
category_columns = ['EventCode', 'EventBaseCode', 'EventRootCode', 'QuadClass',
'Actor1CountryCode', 'Actor2CountryCode', 'ActionGeo_CountryCode']
for col in category_columns:
if col in filtered_df.columns:
filtered_df[col] = filtered_df[col].astype('category')
output_filename = f"gdelt_events_{date.strftime('%Y%m%d')}.parquet"
output_path = output_dir / output_filename
filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False)
logger.info(f"Processed {len(filtered_df)} event records for {date}")
return output_path
except Exception as e:
logger.error(f"Error processing event CSVs for {date}: {str(e)}")
raise
@task(retries=2, log_prints=True)
def process_mentions_files(csv_paths: List[Path], output_dir: Path, date: datetime.date) -> Path:
"""
Combine multiple mentions CSV files (for one day) into a single DataFrame,
filter to only the required columns, optimize data types,
and write out as a single Parquet file.
"""
logger = get_run_logger()
try:
dfs = []
for csv_path in csv_paths:
if csv_path and csv_path.exists():
df = pd.read_csv(
csv_path,
sep='\t',
names=MENTIONS_COLUMNS,
dtype='string',
quoting=3,
na_values=[''],
encoding='utf-8',
encoding_errors='replace'
)
dfs.append(df)
else:
logger.warning(f"CSV path {csv_path} does not exist, skipping")
if not dfs:
logger.warning(f"No valid mentions CSV files found for {date}")
return None
combined_df = pd.concat(dfs, ignore_index=True)
filtered_df = combined_df[MENTIONS_PRIORITY_COLUMNS].copy()
# Convert numeric columns
numeric_columns = ['Confidence', 'MentionDocLen', 'MentionDocTone']
for col in numeric_columns:
if col in filtered_df.columns:
filtered_df[col] = pd.to_numeric(filtered_df[col], errors='coerce')
# Convert date columns
date_columns = ['EventTimeDate', 'MentionTimeDate']
for col in date_columns:
if col in filtered_df.columns:
filtered_df[col] = pd.to_numeric(filtered_df[col], errors='coerce')
# Convert categories
category_columns = ['MentionType', 'MentionSourceName']
for col in category_columns:
if col in filtered_df.columns:
filtered_df[col] = filtered_df[col].astype('category')
output_filename = f"gdelt_mentions_{date.strftime('%Y%m%d')}.parquet"
output_path = output_dir / output_filename
filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False)
logger.info(f"Processed {len(filtered_df)} mention records for {date}")
return output_path
except Exception as e:
logger.error(f"Error processing mentions CSVs for {date}: {str(e)}")
raise
@task(retries=3, retry_delay_seconds=30, log_prints=True)
def upload_to_hf(file_path: Path, repo_id: str, token: str) -> bool:
"""Upload task with global concurrency limit."""
logger = get_run_logger()
if not file_path or not file_path.exists():
logger.warning(f"File path {file_path} does not exist, skipping upload")
return False
try:
with concurrency("hf_uploads", occupy=1):
# Enable the optimized HF Transfer backend
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
api = HfApi()
api.upload_file(
path_or_fileobj=str(file_path),
path_in_repo=file_path.name,
repo_id=repo_id,
repo_type="dataset",
token=token,
)
logger.info(f"Uploaded {file_path.name} to {repo_id}")
return True
except Exception as e:
logger.error(f"Upload failed for {file_path.name}: {str(e)}")
raise
@task(retries=3, retry_delay_seconds=120, log_prints=True)
def create_hf_repo(repo_id: str, token: str) -> bool:
"""
Validate that the Hugging Face dataset repository exists; create it if not.
"""
logger = get_run_logger()
try:
api = HfApi()
datasets = [ds.id for ds in list_datasets(token=token)]
if repo_id in datasets:
logger.info(f"Dataset repository '{repo_id}' already exists.")
return True
# Create the repository if it doesn't exist
api.create_repo(repo_id=repo_id, repo_type="dataset", token=token, private=False)
logger.info(f"Successfully created dataset repository: {repo_id}")
return True
except Exception as e:
logger.error(f"Failed to create or validate dataset repo '{repo_id}': {str(e)}")
raise RuntimeError(f"Repository validation/creation failed for '{repo_id}'") from e
@flow(name="Process Single Day Events and Mentions", log_prints=True)
def process_single_day(
date: datetime.date, url_groups: List[Dict[str, str]], directories: dict, hf_token: str
) -> bool:
"""
Process one day's events and mentions data by:
1. Downloading all CSV files concurrently
2. Processing events and mentions files separately
3. Writing out Parquet files
4. Uploading the files to Hugging Face Hub
"""
logger = get_run_logger()
try:
# Download files
event_csvs = []
mentions_csvs = []
for url_group in url_groups:
event_csv = download_file(url_group["events"], directories["raw"])
mentions_csv = download_file(url_group["mentions"], directories["raw"])
if event_csv:
event_csvs.append(event_csv)
if mentions_csv:
mentions_csvs.append(mentions_csv)
# Process events files
events_parquet = process_events_files(event_csvs, directories["events"], date)
# Process mentions files
mentions_parquet = process_mentions_files(mentions_csvs, directories["mentions"], date)
# Upload to Hugging Face
if events_parquet:
upload_to_hf(events_parquet, EVENT_REPO_ID, hf_token)
if mentions_parquet:
upload_to_hf(mentions_parquet, MENTIONS_REPO_ID, hf_token)
logger.info(f"Completed processing for {date}")
return True
except Exception as e:
logger.error(f"Day {date} failed: {str(e)}")
raise
@flow(
name="Process GDELT Events and Mentions",
task_runner=ConcurrentTaskRunner(), # Parallel subflows
log_prints=True
)
def process_gdelt_events_mentions(base_path: Path = Path("data")):
"""
Main ETL flow:
1. Load parameters and credentials
2. Validate (or create) the Hugging Face repositories
3. Setup directories
4. Generate URL groups by date
5. Process each day concurrently
"""
logger = get_run_logger()
# Load parameters from a JSON block
json_block = JSON.load("gdelt-etl-param-2025")
params = json_block.value
start_date = datetime.datetime.fromisoformat(params.get("start_date", "2025-02-18T00:00:00"))
end_date = datetime.datetime.fromisoformat(params.get("end_date", "2025-02-20T00:00:00"))
# Load the Hugging Face token from a Secret block
secret_block = Secret.load("huggingface-token")
hf_token = secret_block.get()
# Validate or create the repositories
create_hf_repo(EVENT_REPO_ID, hf_token)
create_hf_repo(MENTIONS_REPO_ID, hf_token)
directories = setup_directories(base_path)
url_groups = generate_gdelt_urls(start_date, end_date)
# Process days concurrently (subflows)
futures = [
process_single_day(date, urls, directories, hf_token)
for date, urls in url_groups.items()
]
# Wait for completion (optional error handling)
for future in futures:
try:
future.result()
except Exception as e:
logger.error(f"Failed day: {str(e)}")
# --- Entry Point ---
if __name__ == "__main__":
process_gdelt_events_mentions()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment