Created
July 8, 2025 22:09
-
-
Save mgaitan/f3797ab5570336af0c83abe66fa6eb42 to your computer and use it in GitHub Desktop.
clean up inactive datasets in Honeycomb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Honeycomb Dataset Cleanup Script | |
This script helps clean up inactive datasets in Honeycomb. | |
It lists datasets with no activity in the last N days and allows deletion. | |
Usage: | |
python honeycomb_cleanup.py # List inactive datasets (60 days) | |
python honeycomb_cleanup.py --days 30 # List inactive datasets (30 days) | |
python honeycomb_cleanup.py --delete # Interactive deletion mode | |
python honeycomb_cleanup.py --delete --delete-protected # Also delete protected datasets | |
python honeycomb_cleanup.py --name app-api --name logs # Only consider specific datasets | |
python honeycomb_cleanup.py --delete -n app-api -n logs # Delete only specific datasets | |
Environment Variables: | |
HONEYCOMB_API_KEY: Your Honeycomb API key (required) | |
""" | |
import argparse | |
import os | |
import sys | |
from datetime import datetime, timedelta | |
from typing import List, Dict | |
import requests | |
from rich.console import Console | |
from rich.table import Table | |
from rich.prompt import Prompt | |
console = Console() | |
class HoneycombClient: | |
"""Client for interacting with Honeycomb API""" | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
self.session = requests.Session() | |
self.session.headers.update({ | |
"X-Honeycomb-Team": api_key, | |
"Content-Type": "application/json" | |
}) | |
def get_environment_info(self) -> Dict: | |
"""Fetch environment information""" | |
url = "https://api.honeycomb.io/1/auth" | |
try: | |
response = self.session.get(url) | |
response.raise_for_status() | |
auth_info = response.json() | |
return auth_info.get("environment", {"name": "Unknown", "slug": "unknown"}) | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching environment info: {e}") | |
return {"name": "Unknown", "slug": "unknown"} | |
def get_datasets(self) -> List[Dict]: | |
"""Fetch all datasets from Honeycomb""" | |
url = "https://api.honeycomb.io/1/datasets" | |
try: | |
response = self.session.get(url) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching datasets: {e}") | |
sys.exit(1) | |
def disable_deletion_protection(self, dataset_slug: str) -> bool: | |
"""Disable deletion protection for a dataset""" | |
url = f"https://api.honeycomb.io/1/datasets/{dataset_slug}" | |
payload = { | |
"settings": { | |
"delete_protected": False | |
} | |
} | |
try: | |
response = self.session.put(url, json=payload) | |
response.raise_for_status() | |
return True | |
except requests.exceptions.RequestException as e: | |
if hasattr(e, 'response'): | |
status_code = e.response.status_code | |
print(f"FAILED - Error {status_code} disabling protection for {dataset_slug}") | |
else: | |
print(f"FAILED - Error disabling protection for {dataset_slug}: {e}") | |
return False | |
def delete_dataset(self, dataset_slug: str, disable_protection: bool = False) -> bool: | |
"""Delete a dataset, optionally disabling deletion protection first""" | |
url = f"https://api.honeycomb.io/1/datasets/{dataset_slug}" | |
try: | |
response = self.session.delete(url) | |
response.raise_for_status() | |
return True | |
except requests.exceptions.RequestException as e: | |
if hasattr(e, 'response'): | |
status_code = e.response.status_code | |
# If deletion protection error and flag is enabled, try to disable it | |
is_protected = False | |
if status_code == 409 and disable_protection and hasattr(e, 'response'): | |
# Check both text and JSON for deletion protection indicators | |
if e.response.text and 'delete protected' in e.response.text.lower(): | |
is_protected = True | |
else: | |
try: | |
error_details = e.response.json() | |
if 'error' in error_details and 'delete protected' in error_details['error'].lower(): | |
is_protected = True | |
except: | |
pass | |
if is_protected: | |
print(f"deletion protection detected, disabling... ", end="", flush=True) | |
if self.disable_deletion_protection(dataset_slug): | |
print(f"retrying delete... ", end="", flush=True) | |
try: | |
response = self.session.delete(url) | |
response.raise_for_status() | |
return True | |
except requests.exceptions.RequestException as retry_e: | |
if hasattr(retry_e, 'response'): | |
print(f"FAILED - Error {retry_e.response.status_code} on retry") | |
else: | |
print(f"FAILED - Error on retry: {retry_e}") | |
return False | |
else: | |
return False | |
print(f"FAILED - Error {status_code} deleting {dataset_slug}") | |
try: | |
error_details = e.response.json() | |
if 'error' in error_details: | |
print(f" → {error_details['error']}") | |
except: | |
pass | |
else: | |
print(f"FAILED - Error deleting {dataset_slug}: {e}") | |
return False | |
def is_dataset_inactive(dataset: Dict, days: int) -> bool: | |
"""Check if a dataset is inactive based on last_written_at timestamp""" | |
last_written = dataset.get("last_written_at") | |
if not last_written: | |
# No last_written_at means no data was ever written | |
return True | |
try: | |
# Parse the timestamp (assuming ISO format) | |
last_written_dt = datetime.fromisoformat(last_written.replace('Z', '+00:00')) | |
cutoff_date = datetime.now(last_written_dt.tzinfo) - timedelta(days=days) | |
return last_written_dt < cutoff_date | |
except (ValueError, TypeError): | |
# If we can't parse the date, consider it inactive | |
print(f"Warning: Could not parse last_written_at for dataset {dataset.get('name', 'unknown')}") | |
return True | |
def format_date(date_str: str) -> str: | |
"""Format date string for display""" | |
if not date_str: | |
return "Never" | |
try: | |
dt = datetime.fromisoformat(date_str.replace('Z', '+00:00')) | |
return dt.strftime('%Y-%m-%d') | |
except (ValueError, TypeError): | |
return date_str[:10] if len(date_str) >= 10 else date_str | |
def main(): | |
parser = argparse.ArgumentParser(description="Clean up inactive Honeycomb datasets") | |
parser.add_argument("--days", type=int, default=60, help="Days to look back for activity (default: 60)") | |
parser.add_argument("--delete", action="store_true", help="Enable deletion mode") | |
parser.add_argument("--delete-protected", action="store_true", help="Also delete datasets with deletion protection enabled") | |
parser.add_argument("--name", "-n", action="append", help="Only consider datasets with these names for deletion (can be used multiple times)") | |
parser.add_argument("--api-key", type=str, help="Honeycomb API key (overrides env var)") | |
args = parser.parse_args() | |
# Get API key | |
api_key = args.api_key or os.getenv("HONEYCOMB_API_KEY") | |
if not api_key: | |
print("Error: HONEYCOMB_API_KEY environment variable not set and --api-key not provided") | |
print("Set it with: export HONEYCOMB_API_KEY=your_api_key_here") | |
sys.exit(1) | |
# Initialize client | |
client = HoneycombClient(api_key) | |
# Get environment info | |
env_info = client.get_environment_info() | |
env_name = env_info.get("name", "Unknown") | |
console.print(f"[bold blue]Honeycomb Environment:[/bold blue] [green]{env_name}[/green]") | |
console.print(f"[bold blue]Fetching datasets and checking for inactivity over {args.days} days...[/bold blue]") | |
# Get all datasets | |
datasets = client.get_datasets() | |
# Separate active and inactive datasets | |
inactive_datasets = [] | |
active_datasets = [] | |
filtered_out_datasets = [] | |
for dataset in datasets: | |
dataset_name = dataset.get("name", "") | |
# If specific datasets are specified, only consider those | |
if args.name and dataset_name not in args.name: | |
filtered_out_datasets.append(dataset) | |
continue | |
if is_dataset_inactive(dataset, args.days): | |
inactive_datasets.append(dataset) | |
else: | |
active_datasets.append(dataset) | |
print(f"\nFound {len(active_datasets)} active datasets") | |
print(f"Found {len(inactive_datasets)} inactive datasets") | |
if args.name: | |
print(f"Filtered out {len(filtered_out_datasets)} datasets (not in specified list)") | |
if not inactive_datasets: | |
print("No inactive datasets found. Nothing to clean up!") | |
return | |
# Display inactive datasets in a table | |
table = Table(title=f"Datasets with no activity in the last {args.days} days") | |
table.add_column("Name", style="cyan", no_wrap=True) | |
table.add_column("Created", style="blue") | |
table.add_column("Last Activity", style="yellow") | |
table.add_column("Description", style="green") | |
for dataset in inactive_datasets: | |
name = dataset.get("name", "Unknown") | |
created = format_date(dataset.get("created_at", "")) | |
last_activity = format_date(dataset.get("last_written_at", "")) | |
description = dataset.get("description", "") | |
table.add_row( | |
name, | |
created, | |
last_activity, | |
description[:50] + "..." if len(description) > 50 else description | |
) | |
console.print(table) | |
if not args.delete: | |
print(f"\nTo delete these datasets, run: python {sys.argv[0]} --days {args.days} --delete") | |
return | |
# Deletion mode | |
console.print("[bold red]⚠️ WARNING: DELETION MODE ⚠️[/bold red]") | |
console.print("[bold red]This action cannot be undone![/bold red]") | |
# Final confirmation. we use Prompt.ask instead of Confirm so it's a bit more conscious action | |
if Prompt.ask(f"\nDo you want to delete {len(inactive_datasets)} inactive datasets?", default="no", choices=["yes I do", "no"]) == "no": | |
print("Aborted.") | |
return | |
# Delete datasets | |
print(f"\nDeleting {len(inactive_datasets)} datasets...") | |
deleted_count = 0 | |
for dataset in inactive_datasets: | |
name = dataset.get("name", "Unknown") | |
slug = dataset.get("slug", "") | |
if not slug: | |
print(f"Skipping {name}: no slug found") | |
continue | |
print(f"Deleting {name}... ", end="", flush=True) | |
if client.delete_dataset(slug, args.delete_protected): | |
print("OK") | |
deleted_count += 1 | |
print(f"\nDeleted {deleted_count} out of {len(inactive_datasets)} datasets.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment