Skip to content

Instantly share code, notes, and snippets.

@donbr
Created February 10, 2025 01:40
Show Gist options
  • Save donbr/1a6796beae73ee07ac24c067b50a1fe3 to your computer and use it in GitHub Desktop.
Save donbr/1a6796beae73ee07ac24c067b50a1fe3 to your computer and use it in GitHub Desktop.
A Path to Insight?: End to End data flow from GDELT to Knowledge Graphs

File: README.md

Type: markdown Size: 304 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC

Content:

---
title: Insight
emoji: 🔮
colorFrom: indigo
colorTo: blue
sdk: streamlit
sdk_version: 1.42.0
app_file: app.py
pinned: false
license: cc-by-4.0
short_description: using knowledge graphs for insight
---

Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference

File: app.py

Type: python Size: 11781 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC

Content:

import streamlit as st
import duckdb
import networkx as nx
from pyvis.network import Network
import pandas as pd
from datetime import datetime
import tempfile
import json
import html

# Configure default settings
st.set_page_config(page_title="GDELT Graph Analyzer", layout="wide")

# Color mapping for node types with added descriptions
NODE_TYPES = {
    'event': {
        'color': '#1f77b4',
        'description': 'GDELT Events'
    },
    'person': {
        'color': '#2ca02c',
        'description': 'Named Persons'
    },
    'organization': {
        'color': '#ffa500',
        'description': 'Organizations'
    },
    'location': {
        'color': '#ff0000',
        'description': 'Geographic Locations'
    },
    'theme': {
        'color': '#800080',
        'description': 'Event Themes'
    }
}

@st.cache_data(show_spinner="Loading GDELT data...")
def load_gdelt_data(limit=10, tone_threshold=-5.0):
    """Load data from Hugging Face dataset with caching"""
    con = duckdb.connect(database=':memory:')
    
    # Create view of the dataset
    con.execute("""
      CREATE VIEW negative_tone AS (
          SELECT * 
          FROM read_parquet('hf://datasets/dwb2023/gdelt-gkg-march2020-v2@~parquet/default/negative_tone/*.parquet')
      );
    """)

    # Single query with limit
    query = f"""
    SELECT *
    FROM negative_tone
    WHERE SourceCollectionIdentifier IS NOT NULL
      AND DATE IS NOT NULL
      AND SourceCommonName IS NOT NULL
      AND DocumentIdentifier IS NOT NULL
      AND V1Counts IS NOT NULL
      AND V1Themes IS NOT NULL
      AND V1Locations IS NOT NULL
      AND V1Persons IS NOT NULL
      AND V1Organizations IS NOT NULL
      AND V2GCAM IS NOT NULL
      AND "V2.1Quotations" IS NOT NULL
      AND tone <= {tone_threshold}
    LIMIT {limit};
    """
    
    df = con.execute(query).fetchdf()
    con.close()
    return df

class StreamlitGraphBuilder:
    """Adapted graph builder for Streamlit visualization"""
    def __init__(self):
        self.G = nx.Graph()
        
    def process_row(self, row):
        """Process a single row of data"""
        event_id = row["GKGRECORDID"]
        event_props = {
            "type": "event",  # already in lowercase
            "date": row["DATE"],
            "source": row["SourceCommonName"],
            "document": row["DocumentIdentifier"],
            "tone": row["tone"],
            # Store display name in its original format if needed.
            "name": row["SourceCommonName"]
        }
        
        self.G.add_node(event_id, **event_props)
        
        # Use lowercase node types for consistency in lookups.
        entity_types = {
            "V2EnhancedPersons": ("person", "MENTIONED_IN"),
            "V2EnhancedOrganizations": ("organization", "MENTIONED_IN"),
            "V2EnhancedLocations": ("location", "LOCATED_IN"),
            "V2EnhancedThemes": ("theme", "CATEGORIZED_AS"),
            "V2.1AllNames": ("name", "MENTIONED_IN"),
            "V2.1Counts": ("count", "MENTIONED_IN"),
            "V2.1Amounts": ("amount", "MENTIONED_IN"),
        }
        
        for col, (node_type, rel_type) in entity_types.items():
            if pd.notna(row[col]):
                # The actual display value (which may be in Parent Case) is preserved in the "name" attribute.
                entities = [e.strip() for e in row[col].split(';') if e.strip()]
                for entity in entities:
                    self.G.add_node(entity, type=node_type, name=entity)
                    self.G.add_edge(entity, event_id, 
                                    relationship=rel_type, 
                                    date=row["DATE"])

def create_legend_html():
    """Create HTML for the visualization legend"""
    legend_html = """
    <div style="
        position: absolute;
        top: 10px;
        right: 10px;
        background-color: rgba(255, 255, 255, 0.9);
        padding: 10px;
        border-radius: 5px;
        border: 1px solid #ddd;
        z-index: 1000;
    ">
        <h3 style="margin: 0 0 10px 0;">Legend</h3>
    """
    
    for node_type, info in NODE_TYPES.items():
        legend_html += f"""
        <div style="margin: 5px 0;">
            <span style="
                display: inline-block;
                width: 12px;
                height: 12px;
                background-color: {info['color']};
                border-radius: 50%;
                margin-right: 5px;
            "></span>
            <span>{info['description']}</span>
        </div>
        """
    
    legend_html += "</div>"
    return legend_html

def visualize_with_pyvis(G, physics=True):
    """Create interactive PyVis visualization with legend"""
    net = Network(height="600px", width="100%", notebook=False, directed=False)
    net.from_nx(G)
    
    # Configure node appearance
    for node in net.nodes:
        # Since we've standardized types at insertion, lookup is straightforward.
        node_type = node.get("type", "unknown")
        node["color"] = NODE_TYPES.get(node_type, {}).get('color', "#cccccc")
        node["size"] = 20 if node_type == "event" else 15
        # Format node tooltip with line breaks
        node["title"] = "\n".join([f"{k}: {v}" for k, v in node.items() if k != "id"])

    # Configure edge appearance
    for edge in net.edges:
        edge["title"] = edge.get("relationship", "")
    
    # Configure physics
    if physics:
        net.show_buttons(filter_=['physics'])
    else:
        net.toggle_physics(False)
    
    # Save to temporary file and read back HTML content
    with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
        net.save_graph(f.name)
        f.seek(0)
        html_content = f.read().decode("utf-8")
        
    # Insert legend before closing body tag
    legend = create_legend_html()
    html_content = html_content.replace('</body>', f'{legend}</body>')
    
    return html_content

def display_event_details(df, event_id):
    """Display detailed information for a selected event"""
    event = df[df['GKGRECORDID'] == event_id].iloc[0]
    
    # Basic Information
    st.subheader("Event Information")
    col1, col2 = st.columns(2)
    with col1:
        st.write("**ID:**", event['GKGRECORDID'])
        st.write("**Source:**", event['SourceCommonName'])
        st.write("**Date:**", event['DATE'])
    with col2:
        st.write("**Tone Score:**", f"{event['tone']:.2f}")
        st.write("**Source Type:**", event['SourceCollectionIdentifier'])
    
    # Document Link
    st.write("**Source Document:**")
    st.markdown(f"[{event['DocumentIdentifier']}]({event['DocumentIdentifier']})")
    
    # Entities Tabs
    tabs = st.tabs(["Persons", "Organizations", "Locations", "Themes", "Quotations"])
    
    with tabs[0]:
        if pd.notna(event['V2EnhancedPersons']):
            persons = [p.strip() for p in event['V2EnhancedPersons'].split(';')]
            for person in persons:
                st.write(f"- {person}")
    
    with tabs[1]:
        if pd.notna(event['V2EnhancedOrganizations']):
            orgs = [o.strip() for o in event['V2EnhancedOrganizations'].split(';')]
            for org in orgs:
                st.write(f"- {org}")
    
    with tabs[2]:
        if pd.notna(event['V2EnhancedLocations']):
            locs = [l.strip() for l in event['V2EnhancedLocations'].split(';')]
            for loc in locs:
                st.write(f"- {loc}")
    
    with tabs[3]:
        if pd.notna(event['V2EnhancedThemes']):
            themes = [t.strip() for t in event['V2EnhancedThemes'].split(';')]
            for theme in themes:
                st.write(f"- {theme}")
    
    with tabs[4]:
        if pd.notna(event['V2.1Quotations']):
            quotes = [q.strip() for q in event['V2.1Quotations'].split('#')]
            for quote in quotes:
                if quote:
                    st.write(quote)

def main():
    st.title("GDELT Knowledge Graph Explorer")
    st.markdown("Analyze negative sentiment events from GDELT Global Knowledge Graph")
    
    # Sidebar controls
    with st.sidebar:
        st.header("Controls")
        limit = st.slider("Max records to load", 1, 25, 10)
        tone_threshold = st.slider("Max tone score", -10.0, -5.0, -5.0)
        show_physics = st.checkbox("Enable physics", value=True)
    
    # Load data
    df = load_gdelt_data(limit=limit, tone_threshold=tone_threshold)
    
    # Create tabs for main content
    tab1, tab2 = st.tabs(["Network View", "Event Details"])
    
    with tab1:
        # Build graph
        builder = StreamlitGraphBuilder()
        with st.spinner("Building knowledge graph..."):
            for _, row in df.iterrows():
                builder.process_row(row)
            G = builder.G
        
        # Metrics columns
        col1, col2, col3 = st.columns(3)
        with col1:
            st.metric("Total Nodes", G.number_of_nodes())
        with col2:
            st.metric("Total Edges", G.number_of_edges())
        with col3:
            st.metric("Negative Events", sum(1 for _, attr in G.nodes(data=True) 
                                           if attr.get("type") == "event"))
        
        # Visualization
        st.header("Interactive Network")
        html_graph = visualize_with_pyvis(G, physics=show_physics)
        st.components.v1.html(html_graph, height=600, scrolling=True)
        
        # Analysis section
        st.header("Graph Analysis")
        
        # Centrality measures
        with st.expander("Centrality Analysis"):
            degree_centrality = nx.degree_centrality(G)
            top_nodes = sorted(degree_centrality.items(), 
                              key=lambda x: x[1], reverse=True)[:5]
            
            st.write("Most Connected Nodes:")
            for node, centrality in top_nodes:
                node_type = G.nodes[node].get("type", "unknown")
                st.write(f"- `{node[:30]}` ({node_type}): {centrality:.3f}")
        
        # Community detection
        with st.expander("Community Detection"):
            try:
                communities = nx.community.louvain_communities(G)
                st.write(f"Detected {len(communities)} communities")
                
                community_sizes = sorted([len(c) for c in communities], reverse=True)
                st.bar_chart(community_sizes)
            except Exception as e:
                st.error(f"Community detection failed: {str(e)}")
        
        # Data export
        with st.expander("Export Options"):
            col1, col2 = st.columns(2)
            with col1:
                graphml_string = "".join(nx.generate_graphml(G))
                st.download_button(
                    label="Download GraphML",
                    data=graphml_string.encode('utf-8'),
                    file_name=f"gdelt_graph_{datetime.now().isoformat()}.graphml",
                    mime="application/xml"
                )
            with col2:
                json_string = json.dumps(nx.node_link_data(G, edges="edges"))
                st.download_button(
                    label="Download JSON",
                    data=json_string.encode('utf-8'),
                    file_name=f"gdelt_graph_{datetime.now().isoformat()}.json",
                    mime="application/json"
                )
    
    with tab2:
        # Event selection for details view
        event_ids = df['GKGRECORDID'].tolist()
        selected_event = st.selectbox(
            "Select an event to view details",
            event_ids,
            format_func=lambda x: f"{x} - {df[df['GKGRECORDID']==x].iloc[0]['SourceCommonName']}"
        )
        
        if selected_event:
            display_event_details(df, selected_event)

if __name__ == "__main__":
    main()

File: app_v2.py

Type: python Size: 3524 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC

Content:

"""
Main Streamlit application for GDELT visualization
"""
import streamlit as st
from st_link_analysis import st_link_analysis
import pandas as pd
import os

from data_access import get_gdelt_data, filter_dataframe, GDELT_CATEGORIES
from graph_builder import StLinkBuilder
from graph_config import (
    NODE_STYLES, EDGE_STYLES, LAYOUT_OPTIONS,
    DEFAULT_GRAPH_HEIGHT, DEFAULT_LAYOUT, GRID_COLUMNS
)

# Page configuration
st.set_page_config(layout="wide")

def render_data_filters():
    """Render sidebar filters for data selection"""
    st.sidebar.header("GDELT Records")
    
    source_filter = st.sidebar.text_input("Filter by Source:")
    date_filter = st.sidebar.text_input("Filter by Date:")
    tone_min = st.sidebar.number_input("Minimum Tone:", value=-10.0)
    tone_max = st.sidebar.number_input("Maximum Tone:", value=10.0)
    
    return source_filter, date_filter, tone_min, tone_max

def render_data_grid(df, filters):
    """Render filterable data grid"""
    source_filter, date_filter, tone_min, tone_max = filters
    display_df = filter_dataframe(df, source_filter, date_filter, tone_min, tone_max)
    
    # Dynamically assign a compatible column type based on the underlying dtype
    column_config = {}
    for col, config in GRID_COLUMNS.items():
        new_config = {k: v for k, v in config.items() if k != "format"}
        if col in display_df.columns and pd.api.types.is_numeric_dtype(display_df[col]):
            # Use NumberColumn for numeric types
            column_config[col] = st.column_config.NumberColumn(col, **new_config)
        else:
            # Use TextColumn for non-numeric types
            column_config[col] = st.column_config.TextColumn(col, **new_config)
    
    return st.sidebar.data_editor(
        display_df,
        hide_index=True,
        column_config=column_config,
        height=400
    )

def render_graph(record):
    """Render graph visualization for selected record"""
    stlink_builder = StLinkBuilder()
    graph_data = stlink_builder.build_graph(pd.DataFrame([record]))
    
    return st_link_analysis(
        elements=graph_data,
        layout=DEFAULT_LAYOUT,
        node_styles=NODE_STYLES,
        edge_styles=EDGE_STYLES,
        height=DEFAULT_GRAPH_HEIGHT
    )

def render_raw_data(record):
    """Render raw GDELT data in accordion sections"""
    st.header("Raw GDELT Data")
    
    for category, fields in GDELT_CATEGORIES.items():
        with st.expander(f"{category}"):
            for field in fields:
                if field in record:
                    st.subheader(field)
                    if pd.notna(record[field]):
                        st.text(record[field])
                    else:
                        st.text("No data")
                    st.divider()

def main():
    # Load data
    df = get_gdelt_data(limit=25)
    
    # Render filters and data grid
    filters = render_data_filters()
    selected_index = render_data_grid(df, filters)
    
    # Process selected record
    if len(selected_index) > 0:
        selected_id = selected_index.iloc[0]['ID']
        selected_record = df[df['GKGRECORDID'] == selected_id].iloc[0]
        
        # Show graph
        st.header(f"Event Graph: {selected_record['SourceCommonName']}")
        returned_value = render_graph(selected_record)
        
        # Show raw data
        render_raw_data(selected_record)
    else:
        st.info("Select a record from the left sidebar to view its graph and details.")

if __name__ == "__main__":
    main()

File: data_access.py

Type: python Size: 2252 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC

Content:

"""
Data access module for GDELT data retrieval and filtering
"""
import duckdb
import pandas as pd

def get_gdelt_data(limit=25):
    """Get data from DuckDB with specified limit"""
    con = duckdb.connect(database=':memory:')

    # Create view of the dataset
    con.execute("""
      CREATE VIEW negative_tone AS (SELECT * FROM read_parquet('hf://datasets/dwb2023/gdelt-gkg-march2020-v2@~parquet/default/negative_tone/*.parquet'));
    """)

    # Single query with limit
    query = f"""
    SELECT *
    FROM negative_tone
    WHERE SourceCollectionIdentifier IS NOT NULL
      AND DATE IS NOT NULL
      AND SourceCommonName IS NOT NULL
      AND DocumentIdentifier IS NOT NULL
      AND V1Counts IS NOT NULL
      AND V1Themes IS NOT NULL
      AND V1Locations IS NOT NULL
      AND V1Persons IS NOT NULL
      AND V1Organizations IS NOT NULL
      AND V2GCAM IS NOT NULL
      AND "V2.1Quotations" IS NOT NULL
    LIMIT {limit};
    """

    results_df = con.execute(query).fetchdf()
    con.close()
    return results_df

def filter_dataframe(df, source_filter=None, date_filter=None, tone_min=None, tone_max=None):
    """Filter dataframe based on provided criteria"""
    display_df = df[['GKGRECORDID', 'DATE', 'SourceCommonName', 'tone']].copy()
    display_df.columns = ['ID', 'Date', 'Source', 'Tone']

    if source_filter:
        display_df = display_df[display_df['Source'].str.contains(source_filter, case=False, na=False)]
    if date_filter:
        display_df = display_df[display_df['Date'].str.contains(date_filter, na=False)]
    if tone_min is not None and tone_max is not None:
        display_df = display_df[
            (display_df['Tone'] >= tone_min) & 
            (display_df['Tone'] <= tone_max)
        ]

    return display_df

# Constants for raw data categories
GDELT_CATEGORIES = {
    "Persons": ["V2EnhancedPersons", "V1Persons"],
    "Organizations": ["V2EnhancedOrganizations", "V1Organizations"],
    "Locations": ["V2EnhancedLocations", "V1Locations"],
    "Themes": ["V2EnhancedThemes", "V1Themes"],
    "Names": ["V2.1AllNames"],
    "Counts": ["V2.1Counts", "V1Counts"],
    "Amounts": ["V2.1Amounts"],
    "Other Metadata": ["GKGRECORDID", "DATE", "SourceCommonName", "DocumentIdentifier", "tone"]
}

File: graph_builder.py

Type: python Size: 6262 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC

Content:

"""
Graph builder module for converting GDELT data to graph formats
"""
import pandas as pd
import networkx as nx
import json

class GraphBuilder:
    """Base class for building graph from GDELT data"""
    def process_entities(self, row):
        """Process entities from a row and return nodes and relationships"""
        nodes = []
        relationships = []
        event_id = row["GKGRECORDID"]
        event_date = row["DATE"]
        event_source = row["SourceCommonName"]
        event_document_id = row["DocumentIdentifier"]
        event_quotations = row["V2.1Quotations"] if pd.notna(row["V2.1Quotations"]) else ""
        event_tone = float(row["tone"]) if pd.notna(row["tone"]) else 0.0

        # Add event node
        nodes.append({
            "id": event_id,
            "type": "event",
            "properties": {
                "date": event_date,
                "source": event_source,
                "document": event_document_id,
                "quotations": event_quotations,
                "tone": event_tone
            }
        })

        # Process each entity type
        entity_mappings = {
            "V2EnhancedPersons": ("Person", "MENTIONED_IN"),
            "V2EnhancedOrganizations": ("Organization", "MENTIONED_IN"),
            "V2EnhancedLocations": ("Location", "LOCATED_IN"),
            "V2EnhancedThemes": ("Theme", "CATEGORIZED_AS"),
            "V2.1AllNames": ("Name", "MENTIONED_IN"),
            "V2.1Counts": ("Count", "MENTIONED_IN"),
            "V2.1Amounts": ("Amount", "MENTIONED_IN"),
        }

        for field, (label, relationship) in entity_mappings.items():
            if pd.notna(row[field]):
                entities = [e.strip() for e in row[field].split(';') if e.strip()]
                for entity in entities:
                    nodes.append({
                        "id": entity,
                        "type": label.lower(),
                        "properties": {"name": entity}
                    })
                    relationships.append({
                        "from": entity,
                        "to": event_id,
                        "type": relationship,
                        "properties": {"created_at": event_date}
                    })

        return nodes, relationships

class NetworkXBuilder(GraphBuilder):
    """Builder for NetworkX graphs"""
    def build_graph(self, df):
        G = nx.Graph()

        for _, row in df.iterrows():
            nodes, relationships = self.process_entities(row)

            # Add nodes
            for node in nodes:
                G.add_node(node["id"],
                          type=node["type"],
                          **node["properties"])

            # Add relationships
            for rel in relationships:
                G.add_edge(rel["from"],
                          rel["to"],
                          relationship=rel["type"],
                          **rel["properties"])

        return G

# TODO: Retest Neo4jBuilder based on updates to GraphBuilder
class Neo4jBuilder(GraphBuilder):
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
        self.logger = logging.getLogger(__name__)

    def close(self):
        self.driver.close()

    def build_graph(self, df):
        with self.driver.session() as session:
            for _, row in df.iterrows():
                nodes, relationships = self.process_entities(row)

                # Create nodes and relationships in Neo4j
                try:
                    session.execute_write(self._create_graph_elements,
                                        nodes, relationships)
                except Exception as e:
                    self.logger.error(f"Error processing row {row['GKGRECORDID']}: {str(e)}")

    def _create_graph_elements(self, tx, nodes, relationships):
        # Create nodes
        for node in nodes:
            query = f"""
            MERGE (n:{node['type']} {{id: $id}})
            SET n += $properties
            """
            tx.run(query, id=node["id"], properties=node["properties"])

        # Create relationships
        for rel in relationships:
            query = f"""
            MATCH (a {{id: $from_id}})
            MATCH (b {{id: $to_id}})
            MERGE (a)-[r:{rel['type']}]->(b)
            SET r += $properties
            """
            tx.run(query,
                  from_id=rel["from"],
                  to_id=rel["to"],
                  properties=rel["properties"])


class StLinkBuilder(GraphBuilder):
    """Builder for st-link-analysis compatible graphs"""
    def build_graph(self, df):
        """Build graph in st-link-analysis format"""
        all_nodes = []
        all_edges = []
        edge_counter = 0

        # Track nodes we've already added to avoid duplicates
        added_nodes = set()
        
        for _, row in df.iterrows():
            nodes, relationships = self.process_entities(row)
            
            # Process nodes
            for node in nodes:
                if node["id"] not in added_nodes:
                    stlink_node = {
                        "data": {
                            "id": str(node["id"]),
                            "label": node["type"].upper(),
                            **node["properties"]
                        }
                    }
                    all_nodes.append(stlink_node)
                    added_nodes.add(node["id"])
            
            # Process relationships/edges
            for rel in relationships:
                edge_counter += 1
                stlink_edge = {
                    "data": {
                        "id": f"e{edge_counter}",
                        "source": str(rel["from"]),
                        "target": str(rel["to"]),
                        "label": rel["type"],
                        **rel["properties"]
                    }
                }
                all_edges.append(stlink_edge)
        
        return {
            "nodes": all_nodes,
            "edges": all_edges
        }

    def write_json(self, graph_data, filename):
        """Write graph to JSON file"""
        with open(filename, 'w') as f:
            json.dump(graph_data, f, indent=2)

File: graph_config.py

Type: python Size: 1158 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC

Content:

"""
Configuration module for graph visualization styles
"""
from st_link_analysis import NodeStyle, EdgeStyle

# Node styles configuration
NODE_STYLES = [
    NodeStyle("EVENT", "#FF7F3E", "name", "description"),
    NodeStyle("PERSON", "#4CAF50", "name", "person"),
    NodeStyle("ORGANIZATION", "#9C27B0", "name", "business"),
    NodeStyle("LOCATION", "#2196F3", "name", "place"),
    NodeStyle("THEME", "#FFC107", "name", "sell"),
    NodeStyle("COUNT", "#795548", "name", "numbers"),
    NodeStyle("AMOUNT", "#607D8B", "name", "money")
]

# Edge styles configuration
EDGE_STYLES = [
    EdgeStyle("MENTIONED_IN", caption="label", directed=True),
    EdgeStyle("LOCATED_IN", caption="label", directed=True),
    EdgeStyle("CATEGORIZED_AS", caption="label", directed=True)
]

# Layout options
LAYOUT_OPTIONS = ["cose", "circle", "grid", "breadthfirst", "concentric"]

# Default graph display settings
DEFAULT_GRAPH_HEIGHT = 500
DEFAULT_LAYOUT = "cose"

# Column configuration for data grid
GRID_COLUMNS = {
    "ID": {"width": "medium"},
    "Date": {"width": "small"},
    "Source": {"width": "medium"},
    "Tone": {"width": "small", "format": "%.2f"}
}

File: requirements.txt

Type: txt Size: 87 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC

Content:

streamlit
duckdb
networkx
pandas
pyvis
huggingface_hub
python-dateutil
st-link-analysis

File: data_sources/gdelt_prefect_extract_to_hf_ds.py

Type: python Size: 11919 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC

Content:

import os
import asyncio
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret, JSON
from prefect.task_runners import ConcurrentTaskRunner
from prefect.concurrency.sync import concurrency
from pathlib import Path
import datetime
from datetime import timedelta
import pandas as pd
from tqdm import tqdm
from huggingface_hub import HfApi, hf_hub_url, list_datasets
import requests
import zipfile
from typing import List, Dict, Optional

# --- Constants ---
# Set a global concurrency limit for Hugging Face uploads
REPO_ID = "dwb2023/gdelt-gkg-march2020-v2"

BASE_URL = "http://data.gdeltproject.org/gdeltv2"

# Complete Column List
GKG_COLUMNS = [
    'GKGRECORDID',                 # Unique identifier
    'DATE',                        # Publication date
    'SourceCollectionIdentifier',  # Source type
    'SourceCommonName',            # Source name
    'DocumentIdentifier',          # Document URL/ID
    'V1Counts',                    # Counts of various types
    'V2.1Counts',                  # Enhanced counts with positions
    'V1Themes',                    # Theme tags
    'V2EnhancedThemes',           # Themes with positions
    'V1Locations',                 # Location mentions
    'V2EnhancedLocations',        # Locations with positions
    'V1Persons',                   # Person names
    'V2EnhancedPersons',          # Persons with positions
    'V1Organizations',             # Organization names
    'V2EnhancedOrganizations',     # Organizations with positions
    'V1.5Tone',                    # Emotional dimensions
    'V2.1EnhancedDates',          # Date mentions
    'V2GCAM',                      # Global Content Analysis Measures
    'V2.1SharingImage',            # Publisher selected image
    'V2.1RelatedImages',          # Article images
    'V2.1SocialImageEmbeds',      # Social media images
    'V2.1SocialVideoEmbeds',      # Social media videos
    'V2.1Quotations',             # Quote extractions
    'V2.1AllNames',               # Named entities
    'V2.1Amounts',                # Numeric amounts
    'V2.1TranslationInfo',        # Translation metadata
    'V2ExtrasXML'                 # Additional XML data
]

# Priority Columns
PRIORITY_COLUMNS = [
    'GKGRECORDID',                # Unique identifier
    'DATE',                       # Publication date
    'SourceCollectionIdentifier', # Source type
    'SourceCommonName',           # Source name
    'DocumentIdentifier',         # Document URL/ID
    'V1Counts',                   # Numeric mentions
    'V2.1Counts',                # Enhanced counts
    'V1Themes',                   # Theme tags
    'V2EnhancedThemes',          # Enhanced themes
    'V1Locations',                # Geographic data
    'V2EnhancedLocations',       # Enhanced locations
    'V1Persons',                  # Person mentions
    'V2EnhancedPersons',         # Enhanced persons
    'V1Organizations',            # Organization mentions
    'V2EnhancedOrganizations',   # Enhanced organizations
    'V1.5Tone',                  # Sentiment scores
    'V2.1EnhancedDates',        # Date mentions
    'V2GCAM',                    # Enhanced sentiment
    'V2.1Quotations',           # Direct quotes
    'V2.1AllNames',             # All named entities
    'V2.1Amounts'               # Numeric data
]

# --- Tasks ---

@task(retries=3, retry_delay_seconds=30, log_prints=True)
def setup_directories(base_path: Path) -> dict:
    """Create processing directories."""
    logger = get_run_logger()
    try:
        raw_dir = base_path / "gdelt_raw"
        processed_dir = base_path / "gdelt_processed"
        raw_dir.mkdir(parents=True, exist_ok=True)
        processed_dir.mkdir(parents=True, exist_ok=True)
        logger.info("Directories created successfully")
        return {"raw": raw_dir, "processed": processed_dir}
    except Exception as e:
        logger.error(f"Directory creation failed: {str(e)}")
        raise

@task(retries=2, log_prints=True)
def generate_gdelt_urls(start_date: datetime.datetime, end_date: datetime.datetime) -> Dict[datetime.date, List[str]]:
    """
    Generate a dictionary keyed by date. Each value is a list of URLs (one per 15-minute interval).
    """
    logger = get_run_logger()
    url_groups = {}
    try:
        current_date = start_date.date()
        while current_date <= end_date.date():
            urls = [
                f"{BASE_URL}/{current_date.strftime('%Y%m%d')}{hour:02}{minute:02}00.gkg.csv.zip"
                for hour in range(24)
                for minute in [0, 15, 30, 45]
            ]
            url_groups[current_date] = urls
            current_date += timedelta(days=1)
        logger.info(f"Generated URL groups for dates: {list(url_groups.keys())}")
        return url_groups
    except Exception as e:
        logger.error(f"URL generation failed: {str(e)}")
        raise

@task(retries=3, retry_delay_seconds=30, log_prints=True)
def download_file(url: str, raw_dir: Path) -> Path:
    """Download a single CSV (zip) file from the given URL."""
    logger = get_run_logger()
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        filename = Path(url).name
        zip_path = raw_dir / filename
        with zip_path.open('wb') as f:
            f.write(response.content)
        logger.info(f"Downloaded {filename}")

        # Optionally, extract the CSV from the ZIP archive.
        with zipfile.ZipFile(zip_path, 'r') as z:
            # Assuming the zip contains one CSV file.
            csv_names = z.namelist()
            if csv_names:
                extracted_csv = raw_dir / csv_names[0]
                z.extractall(path=raw_dir)
                logger.info(f"Extracted {csv_names[0]}")
                return extracted_csv
            else:
                raise ValueError("Zip file is empty.")
    except Exception as e:
        logger.error(f"Error downloading {url}: {str(e)}")
        raise

@task(retries=2, log_prints=True)
def convert_and_filter_combined(csv_paths: List[Path], processed_dir: Path, date: datetime.date) -> Path:
    """
    Combine multiple CSV files (for one day) into a single DataFrame,
    filter to only the required columns, optimize data types,
    and write out as a single Parquet file.
    """
    logger = get_run_logger()
    try:
        dfs = []
        for csv_path in csv_paths:
            df = pd.read_csv(
                csv_path,
                sep='\t',
                names=GKG_COLUMNS,
                dtype='string',
                quoting=3,
                na_values=[''],
                encoding='utf-8',
                encoding_errors='replace'
            )
            dfs.append(df)
        combined_df = pd.concat(dfs, ignore_index=True)
        filtered_df = combined_df[PRIORITY_COLUMNS].copy()
        # Convert the date field to datetime; adjust the format if necessary.
        if 'V2.1DATE' in filtered_df.columns:
            filtered_df['V2.1DATE'] = pd.to_datetime(
                filtered_df['V2.1DATE'], format='%Y%m%d%H%M%S', errors='coerce'
            )
        output_filename = f"gdelt_gkg_{date.strftime('%Y%m%d')}.parquet"
        output_path = processed_dir / output_filename
        filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False)
        logger.info(f"Converted and filtered data for {date} into {output_filename}")
        return output_path
    except Exception as e:
        logger.error(f"Error processing CSVs for {date}: {str(e)}")
        raise

@task(retries=3, retry_delay_seconds=30, log_prints=True)
def upload_to_hf(file_path: Path, token: str) -> bool:
    """Upload task with global concurrency limit."""
    logger = get_run_logger()
    try:
        with concurrency("hf_uploads", occupy=1):
            # Enable the optimized HF Transfer backend.
            os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"

            api = HfApi()
            api.upload_file(
                path_or_fileobj=str(file_path),
                path_in_repo=file_path.name,
                repo_id=REPO_ID,
                repo_type="dataset",
                token=token,
            )
            logger.info(f"Uploaded {file_path.name}")
        return True
    except Exception as e:
        logger.error(f"Upload failed for {file_path.name}: {str(e)}")
        raise

@task(retries=3, retry_delay_seconds=120, log_prints=True)
def create_hf_repo(token: str) -> bool:
    """
    Validate that the Hugging Face dataset repository exists; create it if not.
    """
    logger = get_run_logger()
    try:
        api = HfApi()
        datasets = [ds.id for ds in list_datasets(token=token)]
        if REPO_ID in datasets:
            logger.info(f"Dataset repository '{REPO_ID}' already exists.")
            return True
        # Create the repository if it doesn't exist.
        api.create_repo(repo_id=REPO_ID, repo_type="dataset", token=token, private=False)
        logger.info(f"Successfully created dataset repository: {REPO_ID}")
        return True
    except Exception as e:
        logger.error(f"Failed to create or validate dataset repo '{REPO_ID}': {str(e)}")
        raise RuntimeError(f"Repository validation/creation failed for '{REPO_ID}'") from e

@flow(name="Process Single Day", log_prints=True)
def process_single_day(
    date: datetime.date, urls: List[str], directories: dict, hf_token: str
) -> bool:
    """
    Process one day's data by:
      1. Downloading all CSV files concurrently.
      2. Merging, filtering, and optimizing the CSVs.
      3. Writing out a single daily Parquet file.
      4. Uploading the file to the Hugging Face Hub.
    """
    logger = get_run_logger()
    try:
        # Download and process data (unlimited concurrency)
        csv_paths = [download_file(url, directories["raw"]) for url in urls]
        daily_parquet = convert_and_filter_combined(csv_paths, directories["processed"], date)
        
        # Upload with global concurrency limit
        upload_to_hf(daily_parquet, hf_token)  # <-- Throttled to 2 concurrent
    
        logger.info(f"Completed {date}")
        return True
    except Exception as e:
        logger.error(f"Day {date} failed: {str(e)}")
        raise

@flow(
    name="Process Date Range",
    task_runner=ConcurrentTaskRunner(),  # Parallel subflows
    log_prints=True
)
def process_date_range(base_path: Path = Path("data")):
    """
    Main ETL flow:
      1. Load parameters and credentials.
      2. Validate (or create) the Hugging Face repository.
      3. Setup directories.
      4. Generate URL groups by date.
      5. Process each day concurrently.
    """
    logger = get_run_logger()

    # Load parameters from a JSON block.
    json_block = JSON.load("gdelt-etl-parameters")
    params = json_block.value
    start_date = datetime.datetime.fromisoformat(params.get("start_date", "2020-03-16T00:00:00"))
    end_date = datetime.datetime.fromisoformat(params.get("end_date", "2020-03-22T00:00:00"))

    # Load the Hugging Face token from a Secret block.
    secret_block = Secret.load("huggingface-token")
    hf_token = secret_block.get()

    # Validate or create the repository.
    create_hf_repo(hf_token)

    directories = setup_directories(base_path)
    url_groups = generate_gdelt_urls(start_date, end_date)
    
    # Process days concurrently (subflows)
    futures = [process_single_day(date, urls, directories, hf_token) 
              for date, urls in url_groups.items()]
    
    # Wait for completion (optional error handling)
    for future in futures:
        try:
            future.result()
        except Exception as e:
            logger.error(f"Failed day: {str(e)}")

# --- Entry Point ---
if __name__ == "__main__":
    process_date_range.serve(
        name="gdelt-etl-production-v2",
        tags=["gdelt", "etl", "production"],
    )

File: data_sources/hf_gdelt_dataset.md

Type: markdown Size: 6314 bytes Created: 2025-02-10 01:37:02 UTC Modified: 2025-02-10 01:37:02 UTC

Content:

---
license: cc-by-4.0
tags:
- text
- news
- global
- knowledge-graph
- geopolitics
dataset_info:
  features:
  - name: GKGRECORDID
    dtype: string
  - name: DATE
    dtype: string
  - name: SourceCollectionIdentifier
    dtype: string
  - name: SourceCommonName
    dtype: string
  - name: DocumentIdentifier
    dtype: string
  - name: V1Counts
    dtype: string
  - name: V2.1Counts
    dtype: string
  - name: V1Themes
    dtype: string
  - name: V2EnhancedThemes
    dtype: string
  - name: V1Locations
    dtype: string
  - name: V2EnhancedLocations
    dtype: string
  - name: V1Persons
    dtype: string
  - name: V2EnhancedPersons
    dtype: string
  - name: V1Organizations
    dtype: string
  - name: V2EnhancedOrganizations
    dtype: string
  - name: V1.5Tone
    dtype: string
  - name: V2GCAM
    dtype: string
  - name: V2.1EnhancedDates
    dtype: string
  - name: V2.1Quotations
    dtype: string
  - name: V2.1AllNames
    dtype: string
  - name: V2.1Amounts
    dtype: string
  - name: tone
    dtype: float64
  splits:
  - name: train
    num_bytes: 3331097194
    num_examples: 281215
  - name: negative_tone
    num_bytes: 3331097194
    num_examples: 281215
  download_size: 2229048020
  dataset_size: 6662194388
configs:
- config_name: default
  data_files:
  - split: train
    path: data/train-*
  - split: negative_tone
    path: data/negative_tone-*
---

# Dataset Card for dwb2023/gdelt-gkg-march2020-v2

## Dataset Details

### Dataset Description

This dataset contains GDELT Global Knowledge Graph (GKG) data covering March 10-22, 2020, during the early phase of the COVID-19 pandemic. It captures global event interactions, actor relationships, and contextual narratives to support temporal, spatial, and thematic analysis.

- **Curated by:** dwb2023

### Dataset Sources

- **Repository:** [http://data.gdeltproject.org/gdeltv2](http://data.gdeltproject.org/gdeltv2)
- **GKG Documentation:** [GDELT 2.0 Overview](https://blog.gdeltproject.org/gdelt-2-0-our-global-world-in-realtime/), [GDELT GKG Codebook](http://data.gdeltproject.org/documentation/GDELT-Global_Knowledge_Graph_Codebook-V2.1.pdf)

## Uses

### Direct Use

This dataset is suitable for:

- Temporal analysis of global events
- Relationship mapping of key actors in supply chain and logistics
- Sentiment and thematic analysis of COVID-19 pandemic narratives

### Out-of-Scope Use

- Not designed for real-time monitoring due to its historic and static nature
- Not intended for medical diagnosis or predictive health modeling

## Dataset Structure

### Features and Relationships

- this dataset focuses on a subset of features from the source GDELT dataset.

| Name | Type | Aspect | Description |
|------|------|---------|-------------|
| DATE | string | Metadata | Publication date of the article/document |
| SourceCollectionIdentifier | string | Metadata | Unique identifier for the source collection |
| SourceCommonName | string | Metadata | Common/display name of the source |
| DocumentIdentifier | string | Metadata | Unique URL/identifier of the document |
| V1Counts | string | Metrics | Original count mentions of numeric values |
| V2.1Counts | string | Metrics | Enhanced numeric pattern extraction |
| V1Themes | string | Classification | Original thematic categorization |
| V2EnhancedThemes | string | Classification | Expanded theme taxonomy and classification |
| V1Locations | string | Entities | Original geographic mentions |
| V2EnhancedLocations | string | Entities | Enhanced location extraction with coordinates |
| V1Persons | string | Entities | Original person name mentions |
| V2EnhancedPersons | string | Entities | Enhanced person name extraction |
| V1Organizations | string | Entities | Original organization mentions |
| V2EnhancedOrganizations | string | Entities | Enhanced organization name extraction |
| V1.5Tone | string | Sentiment | Original emotional tone scoring |
| V2GCAM | string | Sentiment | Global Content Analysis Measures |
| V2.1EnhancedDates | string | Temporal | Temporal reference extraction |
| V2.1Quotations | string | Content | Direct quote extraction |
| V2.1AllNames | string | Entities | Comprehensive named entity extraction |
| V2.1Amounts | string | Metrics | Quantity and measurement extraction |

### Aspects Overview:
- **Metadata**: Core document information
- **Metrics**: Numerical measurements and counts
- **Classification**: Categorical and thematic analysis
- **Entities**: Named entity recognition (locations, persons, organizations)
- **Sentiment**: Emotional and tone analysis
- **Temporal**: Time-related information
- **Content**: Direct content extraction

## Dataset Creation

### Curation Rationale
This dataset was curated to capture the rapidly evolving global narrative during the early phase of the COVID-19 pandemic, focusing specifically on March 10–22, 2020. By zeroing in on this critical period, it offers a granular perspective on how geopolitical events, actor relationships, and thematic discussions shifted amid the escalating pandemic. The enhanced GKG features further enable advanced entity, sentiment, and thematic analysis, making it a valuable resource for studying the socio-political and economic impacts of COVID-19 during a pivotal point in global history.

### Curation Approach
A targeted subset of GDELT’s columns was selected to streamline analysis on key entities (locations, persons, organizations), thematic tags, and sentiment scores—core components of many knowledge-graph and text analytics workflows. This approach balances comprehensive coverage with manageable data size and performance. The ETL pipeline used to produce these transformations is documented here:
[https://gist.github.com/donbr/e2af2bbe441f90b8664539a25957a6c0](https://gist.github.com/donbr/e2af2bbe441f90b8664539a25957a6c0).

## Citation

When using this dataset, please cite both the dataset and original GDELT project:

```bibtex
@misc{gdelt-gkg-march2020,
    title = {GDELT Global Knowledge Graph March 2020 Dataset},
    author = {dwb2023},
    year = {2025},
    publisher = {Hugging Face},
    url = {https://huggingface.co/datasets/dwb2023/gdelt-gkg-march2020-v2}
}

Dataset Card Contact

For questions and comments about this dataset card, please contact dwb2023 through the Hugging Face platform.


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment