Repository URL: https://huggingface.co/spaces/dwb2023/insight
Type: markdown Size: 304 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC
---
title: Insight
emoji: 🔮
colorFrom: indigo
colorTo: blue
sdk: streamlit
sdk_version: 1.42.0
app_file: app.py
pinned: false
license: cc-by-4.0
short_description: using knowledge graphs for insight
---
Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
Type: python Size: 11781 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC
import streamlit as st
import duckdb
import networkx as nx
from pyvis.network import Network
import pandas as pd
from datetime import datetime
import tempfile
import json
import html
# Configure default settings
st.set_page_config(page_title="GDELT Graph Analyzer", layout="wide")
# Color mapping for node types with added descriptions
NODE_TYPES = {
'event': {
'color': '#1f77b4',
'description': 'GDELT Events'
},
'person': {
'color': '#2ca02c',
'description': 'Named Persons'
},
'organization': {
'color': '#ffa500',
'description': 'Organizations'
},
'location': {
'color': '#ff0000',
'description': 'Geographic Locations'
},
'theme': {
'color': '#800080',
'description': 'Event Themes'
}
}
@st.cache_data(show_spinner="Loading GDELT data...")
def load_gdelt_data(limit=10, tone_threshold=-5.0):
"""Load data from Hugging Face dataset with caching"""
con = duckdb.connect(database=':memory:')
# Create view of the dataset
con.execute("""
CREATE VIEW negative_tone AS (
SELECT *
FROM read_parquet('hf://datasets/dwb2023/gdelt-gkg-march2020-v2@~parquet/default/negative_tone/*.parquet')
);
""")
# Single query with limit
query = f"""
SELECT *
FROM negative_tone
WHERE SourceCollectionIdentifier IS NOT NULL
AND DATE IS NOT NULL
AND SourceCommonName IS NOT NULL
AND DocumentIdentifier IS NOT NULL
AND V1Counts IS NOT NULL
AND V1Themes IS NOT NULL
AND V1Locations IS NOT NULL
AND V1Persons IS NOT NULL
AND V1Organizations IS NOT NULL
AND V2GCAM IS NOT NULL
AND "V2.1Quotations" IS NOT NULL
AND tone <= {tone_threshold}
LIMIT {limit};
"""
df = con.execute(query).fetchdf()
con.close()
return df
class StreamlitGraphBuilder:
"""Adapted graph builder for Streamlit visualization"""
def __init__(self):
self.G = nx.Graph()
def process_row(self, row):
"""Process a single row of data"""
event_id = row["GKGRECORDID"]
event_props = {
"type": "event", # already in lowercase
"date": row["DATE"],
"source": row["SourceCommonName"],
"document": row["DocumentIdentifier"],
"tone": row["tone"],
# Store display name in its original format if needed.
"name": row["SourceCommonName"]
}
self.G.add_node(event_id, **event_props)
# Use lowercase node types for consistency in lookups.
entity_types = {
"V2EnhancedPersons": ("person", "MENTIONED_IN"),
"V2EnhancedOrganizations": ("organization", "MENTIONED_IN"),
"V2EnhancedLocations": ("location", "LOCATED_IN"),
"V2EnhancedThemes": ("theme", "CATEGORIZED_AS"),
"V2.1AllNames": ("name", "MENTIONED_IN"),
"V2.1Counts": ("count", "MENTIONED_IN"),
"V2.1Amounts": ("amount", "MENTIONED_IN"),
}
for col, (node_type, rel_type) in entity_types.items():
if pd.notna(row[col]):
# The actual display value (which may be in Parent Case) is preserved in the "name" attribute.
entities = [e.strip() for e in row[col].split(';') if e.strip()]
for entity in entities:
self.G.add_node(entity, type=node_type, name=entity)
self.G.add_edge(entity, event_id,
relationship=rel_type,
date=row["DATE"])
def create_legend_html():
"""Create HTML for the visualization legend"""
legend_html = """
<div style="
position: absolute;
top: 10px;
right: 10px;
background-color: rgba(255, 255, 255, 0.9);
padding: 10px;
border-radius: 5px;
border: 1px solid #ddd;
z-index: 1000;
">
<h3 style="margin: 0 0 10px 0;">Legend</h3>
"""
for node_type, info in NODE_TYPES.items():
legend_html += f"""
<div style="margin: 5px 0;">
<span style="
display: inline-block;
width: 12px;
height: 12px;
background-color: {info['color']};
border-radius: 50%;
margin-right: 5px;
"></span>
<span>{info['description']}</span>
</div>
"""
legend_html += "</div>"
return legend_html
def visualize_with_pyvis(G, physics=True):
"""Create interactive PyVis visualization with legend"""
net = Network(height="600px", width="100%", notebook=False, directed=False)
net.from_nx(G)
# Configure node appearance
for node in net.nodes:
# Since we've standardized types at insertion, lookup is straightforward.
node_type = node.get("type", "unknown")
node["color"] = NODE_TYPES.get(node_type, {}).get('color', "#cccccc")
node["size"] = 20 if node_type == "event" else 15
# Format node tooltip with line breaks
node["title"] = "\n".join([f"{k}: {v}" for k, v in node.items() if k != "id"])
# Configure edge appearance
for edge in net.edges:
edge["title"] = edge.get("relationship", "")
# Configure physics
if physics:
net.show_buttons(filter_=['physics'])
else:
net.toggle_physics(False)
# Save to temporary file and read back HTML content
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
net.save_graph(f.name)
f.seek(0)
html_content = f.read().decode("utf-8")
# Insert legend before closing body tag
legend = create_legend_html()
html_content = html_content.replace('</body>', f'{legend}</body>')
return html_content
def display_event_details(df, event_id):
"""Display detailed information for a selected event"""
event = df[df['GKGRECORDID'] == event_id].iloc[0]
# Basic Information
st.subheader("Event Information")
col1, col2 = st.columns(2)
with col1:
st.write("**ID:**", event['GKGRECORDID'])
st.write("**Source:**", event['SourceCommonName'])
st.write("**Date:**", event['DATE'])
with col2:
st.write("**Tone Score:**", f"{event['tone']:.2f}")
st.write("**Source Type:**", event['SourceCollectionIdentifier'])
# Document Link
st.write("**Source Document:**")
st.markdown(f"[{event['DocumentIdentifier']}]({event['DocumentIdentifier']})")
# Entities Tabs
tabs = st.tabs(["Persons", "Organizations", "Locations", "Themes", "Quotations"])
with tabs[0]:
if pd.notna(event['V2EnhancedPersons']):
persons = [p.strip() for p in event['V2EnhancedPersons'].split(';')]
for person in persons:
st.write(f"- {person}")
with tabs[1]:
if pd.notna(event['V2EnhancedOrganizations']):
orgs = [o.strip() for o in event['V2EnhancedOrganizations'].split(';')]
for org in orgs:
st.write(f"- {org}")
with tabs[2]:
if pd.notna(event['V2EnhancedLocations']):
locs = [l.strip() for l in event['V2EnhancedLocations'].split(';')]
for loc in locs:
st.write(f"- {loc}")
with tabs[3]:
if pd.notna(event['V2EnhancedThemes']):
themes = [t.strip() for t in event['V2EnhancedThemes'].split(';')]
for theme in themes:
st.write(f"- {theme}")
with tabs[4]:
if pd.notna(event['V2.1Quotations']):
quotes = [q.strip() for q in event['V2.1Quotations'].split('#')]
for quote in quotes:
if quote:
st.write(quote)
def main():
st.title("GDELT Knowledge Graph Explorer")
st.markdown("Analyze negative sentiment events from GDELT Global Knowledge Graph")
# Sidebar controls
with st.sidebar:
st.header("Controls")
limit = st.slider("Max records to load", 1, 25, 10)
tone_threshold = st.slider("Max tone score", -10.0, -5.0, -5.0)
show_physics = st.checkbox("Enable physics", value=True)
# Load data
df = load_gdelt_data(limit=limit, tone_threshold=tone_threshold)
# Create tabs for main content
tab1, tab2 = st.tabs(["Network View", "Event Details"])
with tab1:
# Build graph
builder = StreamlitGraphBuilder()
with st.spinner("Building knowledge graph..."):
for _, row in df.iterrows():
builder.process_row(row)
G = builder.G
# Metrics columns
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Nodes", G.number_of_nodes())
with col2:
st.metric("Total Edges", G.number_of_edges())
with col3:
st.metric("Negative Events", sum(1 for _, attr in G.nodes(data=True)
if attr.get("type") == "event"))
# Visualization
st.header("Interactive Network")
html_graph = visualize_with_pyvis(G, physics=show_physics)
st.components.v1.html(html_graph, height=600, scrolling=True)
# Analysis section
st.header("Graph Analysis")
# Centrality measures
with st.expander("Centrality Analysis"):
degree_centrality = nx.degree_centrality(G)
top_nodes = sorted(degree_centrality.items(),
key=lambda x: x[1], reverse=True)[:5]
st.write("Most Connected Nodes:")
for node, centrality in top_nodes:
node_type = G.nodes[node].get("type", "unknown")
st.write(f"- `{node[:30]}` ({node_type}): {centrality:.3f}")
# Community detection
with st.expander("Community Detection"):
try:
communities = nx.community.louvain_communities(G)
st.write(f"Detected {len(communities)} communities")
community_sizes = sorted([len(c) for c in communities], reverse=True)
st.bar_chart(community_sizes)
except Exception as e:
st.error(f"Community detection failed: {str(e)}")
# Data export
with st.expander("Export Options"):
col1, col2 = st.columns(2)
with col1:
graphml_string = "".join(nx.generate_graphml(G))
st.download_button(
label="Download GraphML",
data=graphml_string.encode('utf-8'),
file_name=f"gdelt_graph_{datetime.now().isoformat()}.graphml",
mime="application/xml"
)
with col2:
json_string = json.dumps(nx.node_link_data(G, edges="edges"))
st.download_button(
label="Download JSON",
data=json_string.encode('utf-8'),
file_name=f"gdelt_graph_{datetime.now().isoformat()}.json",
mime="application/json"
)
with tab2:
# Event selection for details view
event_ids = df['GKGRECORDID'].tolist()
selected_event = st.selectbox(
"Select an event to view details",
event_ids,
format_func=lambda x: f"{x} - {df[df['GKGRECORDID']==x].iloc[0]['SourceCommonName']}"
)
if selected_event:
display_event_details(df, selected_event)
if __name__ == "__main__":
main()
Type: python Size: 3524 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC
"""
Main Streamlit application for GDELT visualization
"""
import streamlit as st
from st_link_analysis import st_link_analysis
import pandas as pd
import os
from data_access import get_gdelt_data, filter_dataframe, GDELT_CATEGORIES
from graph_builder import StLinkBuilder
from graph_config import (
NODE_STYLES, EDGE_STYLES, LAYOUT_OPTIONS,
DEFAULT_GRAPH_HEIGHT, DEFAULT_LAYOUT, GRID_COLUMNS
)
# Page configuration
st.set_page_config(layout="wide")
def render_data_filters():
"""Render sidebar filters for data selection"""
st.sidebar.header("GDELT Records")
source_filter = st.sidebar.text_input("Filter by Source:")
date_filter = st.sidebar.text_input("Filter by Date:")
tone_min = st.sidebar.number_input("Minimum Tone:", value=-10.0)
tone_max = st.sidebar.number_input("Maximum Tone:", value=10.0)
return source_filter, date_filter, tone_min, tone_max
def render_data_grid(df, filters):
"""Render filterable data grid"""
source_filter, date_filter, tone_min, tone_max = filters
display_df = filter_dataframe(df, source_filter, date_filter, tone_min, tone_max)
# Dynamically assign a compatible column type based on the underlying dtype
column_config = {}
for col, config in GRID_COLUMNS.items():
new_config = {k: v for k, v in config.items() if k != "format"}
if col in display_df.columns and pd.api.types.is_numeric_dtype(display_df[col]):
# Use NumberColumn for numeric types
column_config[col] = st.column_config.NumberColumn(col, **new_config)
else:
# Use TextColumn for non-numeric types
column_config[col] = st.column_config.TextColumn(col, **new_config)
return st.sidebar.data_editor(
display_df,
hide_index=True,
column_config=column_config,
height=400
)
def render_graph(record):
"""Render graph visualization for selected record"""
stlink_builder = StLinkBuilder()
graph_data = stlink_builder.build_graph(pd.DataFrame([record]))
return st_link_analysis(
elements=graph_data,
layout=DEFAULT_LAYOUT,
node_styles=NODE_STYLES,
edge_styles=EDGE_STYLES,
height=DEFAULT_GRAPH_HEIGHT
)
def render_raw_data(record):
"""Render raw GDELT data in accordion sections"""
st.header("Raw GDELT Data")
for category, fields in GDELT_CATEGORIES.items():
with st.expander(f"{category}"):
for field in fields:
if field in record:
st.subheader(field)
if pd.notna(record[field]):
st.text(record[field])
else:
st.text("No data")
st.divider()
def main():
# Load data
df = get_gdelt_data(limit=25)
# Render filters and data grid
filters = render_data_filters()
selected_index = render_data_grid(df, filters)
# Process selected record
if len(selected_index) > 0:
selected_id = selected_index.iloc[0]['ID']
selected_record = df[df['GKGRECORDID'] == selected_id].iloc[0]
# Show graph
st.header(f"Event Graph: {selected_record['SourceCommonName']}")
returned_value = render_graph(selected_record)
# Show raw data
render_raw_data(selected_record)
else:
st.info("Select a record from the left sidebar to view its graph and details.")
if __name__ == "__main__":
main()
Type: python Size: 2252 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC
"""
Data access module for GDELT data retrieval and filtering
"""
import duckdb
import pandas as pd
def get_gdelt_data(limit=25):
"""Get data from DuckDB with specified limit"""
con = duckdb.connect(database=':memory:')
# Create view of the dataset
con.execute("""
CREATE VIEW negative_tone AS (SELECT * FROM read_parquet('hf://datasets/dwb2023/gdelt-gkg-march2020-v2@~parquet/default/negative_tone/*.parquet'));
""")
# Single query with limit
query = f"""
SELECT *
FROM negative_tone
WHERE SourceCollectionIdentifier IS NOT NULL
AND DATE IS NOT NULL
AND SourceCommonName IS NOT NULL
AND DocumentIdentifier IS NOT NULL
AND V1Counts IS NOT NULL
AND V1Themes IS NOT NULL
AND V1Locations IS NOT NULL
AND V1Persons IS NOT NULL
AND V1Organizations IS NOT NULL
AND V2GCAM IS NOT NULL
AND "V2.1Quotations" IS NOT NULL
LIMIT {limit};
"""
results_df = con.execute(query).fetchdf()
con.close()
return results_df
def filter_dataframe(df, source_filter=None, date_filter=None, tone_min=None, tone_max=None):
"""Filter dataframe based on provided criteria"""
display_df = df[['GKGRECORDID', 'DATE', 'SourceCommonName', 'tone']].copy()
display_df.columns = ['ID', 'Date', 'Source', 'Tone']
if source_filter:
display_df = display_df[display_df['Source'].str.contains(source_filter, case=False, na=False)]
if date_filter:
display_df = display_df[display_df['Date'].str.contains(date_filter, na=False)]
if tone_min is not None and tone_max is not None:
display_df = display_df[
(display_df['Tone'] >= tone_min) &
(display_df['Tone'] <= tone_max)
]
return display_df
# Constants for raw data categories
GDELT_CATEGORIES = {
"Persons": ["V2EnhancedPersons", "V1Persons"],
"Organizations": ["V2EnhancedOrganizations", "V1Organizations"],
"Locations": ["V2EnhancedLocations", "V1Locations"],
"Themes": ["V2EnhancedThemes", "V1Themes"],
"Names": ["V2.1AllNames"],
"Counts": ["V2.1Counts", "V1Counts"],
"Amounts": ["V2.1Amounts"],
"Other Metadata": ["GKGRECORDID", "DATE", "SourceCommonName", "DocumentIdentifier", "tone"]
}
Type: python Size: 6262 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC
"""
Graph builder module for converting GDELT data to graph formats
"""
import pandas as pd
import networkx as nx
import json
class GraphBuilder:
"""Base class for building graph from GDELT data"""
def process_entities(self, row):
"""Process entities from a row and return nodes and relationships"""
nodes = []
relationships = []
event_id = row["GKGRECORDID"]
event_date = row["DATE"]
event_source = row["SourceCommonName"]
event_document_id = row["DocumentIdentifier"]
event_quotations = row["V2.1Quotations"] if pd.notna(row["V2.1Quotations"]) else ""
event_tone = float(row["tone"]) if pd.notna(row["tone"]) else 0.0
# Add event node
nodes.append({
"id": event_id,
"type": "event",
"properties": {
"date": event_date,
"source": event_source,
"document": event_document_id,
"quotations": event_quotations,
"tone": event_tone
}
})
# Process each entity type
entity_mappings = {
"V2EnhancedPersons": ("Person", "MENTIONED_IN"),
"V2EnhancedOrganizations": ("Organization", "MENTIONED_IN"),
"V2EnhancedLocations": ("Location", "LOCATED_IN"),
"V2EnhancedThemes": ("Theme", "CATEGORIZED_AS"),
"V2.1AllNames": ("Name", "MENTIONED_IN"),
"V2.1Counts": ("Count", "MENTIONED_IN"),
"V2.1Amounts": ("Amount", "MENTIONED_IN"),
}
for field, (label, relationship) in entity_mappings.items():
if pd.notna(row[field]):
entities = [e.strip() for e in row[field].split(';') if e.strip()]
for entity in entities:
nodes.append({
"id": entity,
"type": label.lower(),
"properties": {"name": entity}
})
relationships.append({
"from": entity,
"to": event_id,
"type": relationship,
"properties": {"created_at": event_date}
})
return nodes, relationships
class NetworkXBuilder(GraphBuilder):
"""Builder for NetworkX graphs"""
def build_graph(self, df):
G = nx.Graph()
for _, row in df.iterrows():
nodes, relationships = self.process_entities(row)
# Add nodes
for node in nodes:
G.add_node(node["id"],
type=node["type"],
**node["properties"])
# Add relationships
for rel in relationships:
G.add_edge(rel["from"],
rel["to"],
relationship=rel["type"],
**rel["properties"])
return G
# TODO: Retest Neo4jBuilder based on updates to GraphBuilder
class Neo4jBuilder(GraphBuilder):
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.logger = logging.getLogger(__name__)
def close(self):
self.driver.close()
def build_graph(self, df):
with self.driver.session() as session:
for _, row in df.iterrows():
nodes, relationships = self.process_entities(row)
# Create nodes and relationships in Neo4j
try:
session.execute_write(self._create_graph_elements,
nodes, relationships)
except Exception as e:
self.logger.error(f"Error processing row {row['GKGRECORDID']}: {str(e)}")
def _create_graph_elements(self, tx, nodes, relationships):
# Create nodes
for node in nodes:
query = f"""
MERGE (n:{node['type']} {{id: $id}})
SET n += $properties
"""
tx.run(query, id=node["id"], properties=node["properties"])
# Create relationships
for rel in relationships:
query = f"""
MATCH (a {{id: $from_id}})
MATCH (b {{id: $to_id}})
MERGE (a)-[r:{rel['type']}]->(b)
SET r += $properties
"""
tx.run(query,
from_id=rel["from"],
to_id=rel["to"],
properties=rel["properties"])
class StLinkBuilder(GraphBuilder):
"""Builder for st-link-analysis compatible graphs"""
def build_graph(self, df):
"""Build graph in st-link-analysis format"""
all_nodes = []
all_edges = []
edge_counter = 0
# Track nodes we've already added to avoid duplicates
added_nodes = set()
for _, row in df.iterrows():
nodes, relationships = self.process_entities(row)
# Process nodes
for node in nodes:
if node["id"] not in added_nodes:
stlink_node = {
"data": {
"id": str(node["id"]),
"label": node["type"].upper(),
**node["properties"]
}
}
all_nodes.append(stlink_node)
added_nodes.add(node["id"])
# Process relationships/edges
for rel in relationships:
edge_counter += 1
stlink_edge = {
"data": {
"id": f"e{edge_counter}",
"source": str(rel["from"]),
"target": str(rel["to"]),
"label": rel["type"],
**rel["properties"]
}
}
all_edges.append(stlink_edge)
return {
"nodes": all_nodes,
"edges": all_edges
}
def write_json(self, graph_data, filename):
"""Write graph to JSON file"""
with open(filename, 'w') as f:
json.dump(graph_data, f, indent=2)
Type: python Size: 1158 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC
"""
Configuration module for graph visualization styles
"""
from st_link_analysis import NodeStyle, EdgeStyle
# Node styles configuration
NODE_STYLES = [
NodeStyle("EVENT", "#FF7F3E", "name", "description"),
NodeStyle("PERSON", "#4CAF50", "name", "person"),
NodeStyle("ORGANIZATION", "#9C27B0", "name", "business"),
NodeStyle("LOCATION", "#2196F3", "name", "place"),
NodeStyle("THEME", "#FFC107", "name", "sell"),
NodeStyle("COUNT", "#795548", "name", "numbers"),
NodeStyle("AMOUNT", "#607D8B", "name", "money")
]
# Edge styles configuration
EDGE_STYLES = [
EdgeStyle("MENTIONED_IN", caption="label", directed=True),
EdgeStyle("LOCATED_IN", caption="label", directed=True),
EdgeStyle("CATEGORIZED_AS", caption="label", directed=True)
]
# Layout options
LAYOUT_OPTIONS = ["cose", "circle", "grid", "breadthfirst", "concentric"]
# Default graph display settings
DEFAULT_GRAPH_HEIGHT = 500
DEFAULT_LAYOUT = "cose"
# Column configuration for data grid
GRID_COLUMNS = {
"ID": {"width": "medium"},
"Date": {"width": "small"},
"Source": {"width": "medium"},
"Tone": {"width": "small", "format": "%.2f"}
}
Type: txt Size: 87 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC
streamlit
duckdb
networkx
pandas
pyvis
huggingface_hub
python-dateutil
st-link-analysis
Type: python Size: 11919 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC
import os
import asyncio
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret, JSON
from prefect.task_runners import ConcurrentTaskRunner
from prefect.concurrency.sync import concurrency
from pathlib import Path
import datetime
from datetime import timedelta
import pandas as pd
from tqdm import tqdm
from huggingface_hub import HfApi, hf_hub_url, list_datasets
import requests
import zipfile
from typing import List, Dict, Optional
# --- Constants ---
# Set a global concurrency limit for Hugging Face uploads
REPO_ID = "dwb2023/gdelt-gkg-march2020-v2"
BASE_URL = "http://data.gdeltproject.org/gdeltv2"
# Complete Column List
GKG_COLUMNS = [
'GKGRECORDID', # Unique identifier
'DATE', # Publication date
'SourceCollectionIdentifier', # Source type
'SourceCommonName', # Source name
'DocumentIdentifier', # Document URL/ID
'V1Counts', # Counts of various types
'V2.1Counts', # Enhanced counts with positions
'V1Themes', # Theme tags
'V2EnhancedThemes', # Themes with positions
'V1Locations', # Location mentions
'V2EnhancedLocations', # Locations with positions
'V1Persons', # Person names
'V2EnhancedPersons', # Persons with positions
'V1Organizations', # Organization names
'V2EnhancedOrganizations', # Organizations with positions
'V1.5Tone', # Emotional dimensions
'V2.1EnhancedDates', # Date mentions
'V2GCAM', # Global Content Analysis Measures
'V2.1SharingImage', # Publisher selected image
'V2.1RelatedImages', # Article images
'V2.1SocialImageEmbeds', # Social media images
'V2.1SocialVideoEmbeds', # Social media videos
'V2.1Quotations', # Quote extractions
'V2.1AllNames', # Named entities
'V2.1Amounts', # Numeric amounts
'V2.1TranslationInfo', # Translation metadata
'V2ExtrasXML' # Additional XML data
]
# Priority Columns
PRIORITY_COLUMNS = [
'GKGRECORDID', # Unique identifier
'DATE', # Publication date
'SourceCollectionIdentifier', # Source type
'SourceCommonName', # Source name
'DocumentIdentifier', # Document URL/ID
'V1Counts', # Numeric mentions
'V2.1Counts', # Enhanced counts
'V1Themes', # Theme tags
'V2EnhancedThemes', # Enhanced themes
'V1Locations', # Geographic data
'V2EnhancedLocations', # Enhanced locations
'V1Persons', # Person mentions
'V2EnhancedPersons', # Enhanced persons
'V1Organizations', # Organization mentions
'V2EnhancedOrganizations', # Enhanced organizations
'V1.5Tone', # Sentiment scores
'V2.1EnhancedDates', # Date mentions
'V2GCAM', # Enhanced sentiment
'V2.1Quotations', # Direct quotes
'V2.1AllNames', # All named entities
'V2.1Amounts' # Numeric data
]
# --- Tasks ---
@task(retries=3, retry_delay_seconds=30, log_prints=True)
def setup_directories(base_path: Path) -> dict:
"""Create processing directories."""
logger = get_run_logger()
try:
raw_dir = base_path / "gdelt_raw"
processed_dir = base_path / "gdelt_processed"
raw_dir.mkdir(parents=True, exist_ok=True)
processed_dir.mkdir(parents=True, exist_ok=True)
logger.info("Directories created successfully")
return {"raw": raw_dir, "processed": processed_dir}
except Exception as e:
logger.error(f"Directory creation failed: {str(e)}")
raise
@task(retries=2, log_prints=True)
def generate_gdelt_urls(start_date: datetime.datetime, end_date: datetime.datetime) -> Dict[datetime.date, List[str]]:
"""
Generate a dictionary keyed by date. Each value is a list of URLs (one per 15-minute interval).
"""
logger = get_run_logger()
url_groups = {}
try:
current_date = start_date.date()
while current_date <= end_date.date():
urls = [
f"{BASE_URL}/{current_date.strftime('%Y%m%d')}{hour:02}{minute:02}00.gkg.csv.zip"
for hour in range(24)
for minute in [0, 15, 30, 45]
]
url_groups[current_date] = urls
current_date += timedelta(days=1)
logger.info(f"Generated URL groups for dates: {list(url_groups.keys())}")
return url_groups
except Exception as e:
logger.error(f"URL generation failed: {str(e)}")
raise
@task(retries=3, retry_delay_seconds=30, log_prints=True)
def download_file(url: str, raw_dir: Path) -> Path:
"""Download a single CSV (zip) file from the given URL."""
logger = get_run_logger()
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
filename = Path(url).name
zip_path = raw_dir / filename
with zip_path.open('wb') as f:
f.write(response.content)
logger.info(f"Downloaded {filename}")
# Optionally, extract the CSV from the ZIP archive.
with zipfile.ZipFile(zip_path, 'r') as z:
# Assuming the zip contains one CSV file.
csv_names = z.namelist()
if csv_names:
extracted_csv = raw_dir / csv_names[0]
z.extractall(path=raw_dir)
logger.info(f"Extracted {csv_names[0]}")
return extracted_csv
else:
raise ValueError("Zip file is empty.")
except Exception as e:
logger.error(f"Error downloading {url}: {str(e)}")
raise
@task(retries=2, log_prints=True)
def convert_and_filter_combined(csv_paths: List[Path], processed_dir: Path, date: datetime.date) -> Path:
"""
Combine multiple CSV files (for one day) into a single DataFrame,
filter to only the required columns, optimize data types,
and write out as a single Parquet file.
"""
logger = get_run_logger()
try:
dfs = []
for csv_path in csv_paths:
df = pd.read_csv(
csv_path,
sep='\t',
names=GKG_COLUMNS,
dtype='string',
quoting=3,
na_values=[''],
encoding='utf-8',
encoding_errors='replace'
)
dfs.append(df)
combined_df = pd.concat(dfs, ignore_index=True)
filtered_df = combined_df[PRIORITY_COLUMNS].copy()
# Convert the date field to datetime; adjust the format if necessary.
if 'V2.1DATE' in filtered_df.columns:
filtered_df['V2.1DATE'] = pd.to_datetime(
filtered_df['V2.1DATE'], format='%Y%m%d%H%M%S', errors='coerce'
)
output_filename = f"gdelt_gkg_{date.strftime('%Y%m%d')}.parquet"
output_path = processed_dir / output_filename
filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False)
logger.info(f"Converted and filtered data for {date} into {output_filename}")
return output_path
except Exception as e:
logger.error(f"Error processing CSVs for {date}: {str(e)}")
raise
@task(retries=3, retry_delay_seconds=30, log_prints=True)
def upload_to_hf(file_path: Path, token: str) -> bool:
"""Upload task with global concurrency limit."""
logger = get_run_logger()
try:
with concurrency("hf_uploads", occupy=1):
# Enable the optimized HF Transfer backend.
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
api = HfApi()
api.upload_file(
path_or_fileobj=str(file_path),
path_in_repo=file_path.name,
repo_id=REPO_ID,
repo_type="dataset",
token=token,
)
logger.info(f"Uploaded {file_path.name}")
return True
except Exception as e:
logger.error(f"Upload failed for {file_path.name}: {str(e)}")
raise
@task(retries=3, retry_delay_seconds=120, log_prints=True)
def create_hf_repo(token: str) -> bool:
"""
Validate that the Hugging Face dataset repository exists; create it if not.
"""
logger = get_run_logger()
try:
api = HfApi()
datasets = [ds.id for ds in list_datasets(token=token)]
if REPO_ID in datasets:
logger.info(f"Dataset repository '{REPO_ID}' already exists.")
return True
# Create the repository if it doesn't exist.
api.create_repo(repo_id=REPO_ID, repo_type="dataset", token=token, private=False)
logger.info(f"Successfully created dataset repository: {REPO_ID}")
return True
except Exception as e:
logger.error(f"Failed to create or validate dataset repo '{REPO_ID}': {str(e)}")
raise RuntimeError(f"Repository validation/creation failed for '{REPO_ID}'") from e
@flow(name="Process Single Day", log_prints=True)
def process_single_day(
date: datetime.date, urls: List[str], directories: dict, hf_token: str
) -> bool:
"""
Process one day's data by:
1. Downloading all CSV files concurrently.
2. Merging, filtering, and optimizing the CSVs.
3. Writing out a single daily Parquet file.
4. Uploading the file to the Hugging Face Hub.
"""
logger = get_run_logger()
try:
# Download and process data (unlimited concurrency)
csv_paths = [download_file(url, directories["raw"]) for url in urls]
daily_parquet = convert_and_filter_combined(csv_paths, directories["processed"], date)
# Upload with global concurrency limit
upload_to_hf(daily_parquet, hf_token) # <-- Throttled to 2 concurrent
logger.info(f"Completed {date}")
return True
except Exception as e:
logger.error(f"Day {date} failed: {str(e)}")
raise
@flow(
name="Process Date Range",
task_runner=ConcurrentTaskRunner(), # Parallel subflows
log_prints=True
)
def process_date_range(base_path: Path = Path("data")):
"""
Main ETL flow:
1. Load parameters and credentials.
2. Validate (or create) the Hugging Face repository.
3. Setup directories.
4. Generate URL groups by date.
5. Process each day concurrently.
"""
logger = get_run_logger()
# Load parameters from a JSON block.
json_block = JSON.load("gdelt-etl-parameters")
params = json_block.value
start_date = datetime.datetime.fromisoformat(params.get("start_date", "2020-03-16T00:00:00"))
end_date = datetime.datetime.fromisoformat(params.get("end_date", "2020-03-22T00:00:00"))
# Load the Hugging Face token from a Secret block.
secret_block = Secret.load("huggingface-token")
hf_token = secret_block.get()
# Validate or create the repository.
create_hf_repo(hf_token)
directories = setup_directories(base_path)
url_groups = generate_gdelt_urls(start_date, end_date)
# Process days concurrently (subflows)
futures = [process_single_day(date, urls, directories, hf_token)
for date, urls in url_groups.items()]
# Wait for completion (optional error handling)
for future in futures:
try:
future.result()
except Exception as e:
logger.error(f"Failed day: {str(e)}")
# --- Entry Point ---
if __name__ == "__main__":
process_date_range.serve(
name="gdelt-etl-production-v2",
tags=["gdelt", "etl", "production"],
)
Type: markdown Size: 6314 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC
---
license: cc-by-4.0
tags:
- text
- news
- global
- knowledge-graph
- geopolitics
dataset_info:
features:
- name: GKGRECORDID
dtype: string
- name: DATE
dtype: string
- name: SourceCollectionIdentifier
dtype: string
- name: SourceCommonName
dtype: string
- name: DocumentIdentifier
dtype: string
- name: V1Counts
dtype: string
- name: V2.1Counts
dtype: string
- name: V1Themes
dtype: string
- name: V2EnhancedThemes
dtype: string
- name: V1Locations
dtype: string
- name: V2EnhancedLocations
dtype: string
- name: V1Persons
dtype: string
- name: V2EnhancedPersons
dtype: string
- name: V1Organizations
dtype: string
- name: V2EnhancedOrganizations
dtype: string
- name: V1.5Tone
dtype: string
- name: V2GCAM
dtype: string
- name: V2.1EnhancedDates
dtype: string
- name: V2.1Quotations
dtype: string
- name: V2.1AllNames
dtype: string
- name: V2.1Amounts
dtype: string
- name: tone
dtype: float64
splits:
- name: train
num_bytes: 3331097194
num_examples: 281215
- name: negative_tone
num_bytes: 3331097194
num_examples: 281215
download_size: 2229048020
dataset_size: 6662194388
configs:
- config_name: default
data_files:
- split: train
path: data/train-*
- split: negative_tone
path: data/negative_tone-*
---
# Dataset Card for dwb2023/gdelt-gkg-march2020-v2
## Dataset Details
### Dataset Description
This dataset contains GDELT Global Knowledge Graph (GKG) data covering March 10-22, 2020, during the early phase of the COVID-19 pandemic. It captures global event interactions, actor relationships, and contextual narratives to support temporal, spatial, and thematic analysis.
- **Curated by:** dwb2023
### Dataset Sources
- **Repository:** [http://data.gdeltproject.org/gdeltv2](http://data.gdeltproject.org/gdeltv2)
- **GKG Documentation:** [GDELT 2.0 Overview](https://blog.gdeltproject.org/gdelt-2-0-our-global-world-in-realtime/), [GDELT GKG Codebook](http://data.gdeltproject.org/documentation/GDELT-Global_Knowledge_Graph_Codebook-V2.1.pdf)
## Uses
### Direct Use
This dataset is suitable for:
- Temporal analysis of global events
- Relationship mapping of key actors in supply chain and logistics
- Sentiment and thematic analysis of COVID-19 pandemic narratives
### Out-of-Scope Use
- Not designed for real-time monitoring due to its historic and static nature
- Not intended for medical diagnosis or predictive health modeling
## Dataset Structure
### Features and Relationships
- this dataset focuses on a subset of features from the source GDELT dataset.
| Name | Type | Aspect | Description |
|------|------|---------|-------------|
| DATE | string | Metadata | Publication date of the article/document |
| SourceCollectionIdentifier | string | Metadata | Unique identifier for the source collection |
| SourceCommonName | string | Metadata | Common/display name of the source |
| DocumentIdentifier | string | Metadata | Unique URL/identifier of the document |
| V1Counts | string | Metrics | Original count mentions of numeric values |
| V2.1Counts | string | Metrics | Enhanced numeric pattern extraction |
| V1Themes | string | Classification | Original thematic categorization |
| V2EnhancedThemes | string | Classification | Expanded theme taxonomy and classification |
| V1Locations | string | Entities | Original geographic mentions |
| V2EnhancedLocations | string | Entities | Enhanced location extraction with coordinates |
| V1Persons | string | Entities | Original person name mentions |
| V2EnhancedPersons | string | Entities | Enhanced person name extraction |
| V1Organizations | string | Entities | Original organization mentions |
| V2EnhancedOrganizations | string | Entities | Enhanced organization name extraction |
| V1.5Tone | string | Sentiment | Original emotional tone scoring |
| V2GCAM | string | Sentiment | Global Content Analysis Measures |
| V2.1EnhancedDates | string | Temporal | Temporal reference extraction |
| V2.1Quotations | string | Content | Direct quote extraction |
| V2.1AllNames | string | Entities | Comprehensive named entity extraction |
| V2.1Amounts | string | Metrics | Quantity and measurement extraction |
### Aspects Overview:
- **Metadata**: Core document information
- **Metrics**: Numerical measurements and counts
- **Classification**: Categorical and thematic analysis
- **Entities**: Named entity recognition (locations, persons, organizations)
- **Sentiment**: Emotional and tone analysis
- **Temporal**: Time-related information
- **Content**: Direct content extraction
## Dataset Creation
### Curation Rationale
This dataset was curated to capture the rapidly evolving global narrative during the early phase of the COVID-19 pandemic, focusing specifically on March 10–22, 2020. By zeroing in on this critical period, it offers a granular perspective on how geopolitical events, actor relationships, and thematic discussions shifted amid the escalating pandemic. The enhanced GKG features further enable advanced entity, sentiment, and thematic analysis, making it a valuable resource for studying the socio-political and economic impacts of COVID-19 during a pivotal point in global history.
### Curation Approach
A targeted subset of GDELT’s columns was selected to streamline analysis on key entities (locations, persons, organizations), thematic tags, and sentiment scores—core components of many knowledge-graph and text analytics workflows. This approach balances comprehensive coverage with manageable data size and performance. The ETL pipeline used to produce these transformations is documented here:
[https://gist.github.com/donbr/e2af2bbe441f90b8664539a25957a6c0](https://gist.github.com/donbr/e2af2bbe441f90b8664539a25957a6c0).
## Citation
When using this dataset, please cite both the dataset and original GDELT project:
```bibtex
@misc{gdelt-gkg-march2020,
title = {GDELT Global Knowledge Graph March 2020 Dataset},
author = {dwb2023},
year = {2025},
publisher = {Hugging Face},
url = {https://huggingface.co/datasets/dwb2023/gdelt-gkg-march2020-v2}
}
For questions and comments about this dataset card, please contact dwb2023 through the Hugging Face platform.