Skip to content

Instantly share code, notes, and snippets.

@mgaitan
Created July 8, 2025 22:09
Show Gist options
  • Save mgaitan/f3797ab5570336af0c83abe66fa6eb42 to your computer and use it in GitHub Desktop.
Save mgaitan/f3797ab5570336af0c83abe66fa6eb42 to your computer and use it in GitHub Desktop.
clean up inactive datasets in Honeycomb
#!/usr/bin/env python3
"""
Honeycomb Dataset Cleanup Script
This script helps clean up inactive datasets in Honeycomb.
It lists datasets with no activity in the last N days and allows deletion.
Usage:
python honeycomb_cleanup.py # List inactive datasets (60 days)
python honeycomb_cleanup.py --days 30 # List inactive datasets (30 days)
python honeycomb_cleanup.py --delete # Interactive deletion mode
python honeycomb_cleanup.py --delete --delete-protected # Also delete protected datasets
python honeycomb_cleanup.py --name app-api --name logs # Only consider specific datasets
python honeycomb_cleanup.py --delete -n app-api -n logs # Delete only specific datasets
Environment Variables:
HONEYCOMB_API_KEY: Your Honeycomb API key (required)
"""
import argparse
import os
import sys
from datetime import datetime, timedelta
from typing import List, Dict
import requests
from rich.console import Console
from rich.table import Table
from rich.prompt import Prompt
console = Console()
class HoneycombClient:
"""Client for interacting with Honeycomb API"""
def __init__(self, api_key: str):
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
"X-Honeycomb-Team": api_key,
"Content-Type": "application/json"
})
def get_environment_info(self) -> Dict:
"""Fetch environment information"""
url = "https://api.honeycomb.io/1/auth"
try:
response = self.session.get(url)
response.raise_for_status()
auth_info = response.json()
return auth_info.get("environment", {"name": "Unknown", "slug": "unknown"})
except requests.exceptions.RequestException as e:
print(f"Error fetching environment info: {e}")
return {"name": "Unknown", "slug": "unknown"}
def get_datasets(self) -> List[Dict]:
"""Fetch all datasets from Honeycomb"""
url = "https://api.honeycomb.io/1/datasets"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error fetching datasets: {e}")
sys.exit(1)
def disable_deletion_protection(self, dataset_slug: str) -> bool:
"""Disable deletion protection for a dataset"""
url = f"https://api.honeycomb.io/1/datasets/{dataset_slug}"
payload = {
"settings": {
"delete_protected": False
}
}
try:
response = self.session.put(url, json=payload)
response.raise_for_status()
return True
except requests.exceptions.RequestException as e:
if hasattr(e, 'response'):
status_code = e.response.status_code
print(f"FAILED - Error {status_code} disabling protection for {dataset_slug}")
else:
print(f"FAILED - Error disabling protection for {dataset_slug}: {e}")
return False
def delete_dataset(self, dataset_slug: str, disable_protection: bool = False) -> bool:
"""Delete a dataset, optionally disabling deletion protection first"""
url = f"https://api.honeycomb.io/1/datasets/{dataset_slug}"
try:
response = self.session.delete(url)
response.raise_for_status()
return True
except requests.exceptions.RequestException as e:
if hasattr(e, 'response'):
status_code = e.response.status_code
# If deletion protection error and flag is enabled, try to disable it
is_protected = False
if status_code == 409 and disable_protection and hasattr(e, 'response'):
# Check both text and JSON for deletion protection indicators
if e.response.text and 'delete protected' in e.response.text.lower():
is_protected = True
else:
try:
error_details = e.response.json()
if 'error' in error_details and 'delete protected' in error_details['error'].lower():
is_protected = True
except:
pass
if is_protected:
print(f"deletion protection detected, disabling... ", end="", flush=True)
if self.disable_deletion_protection(dataset_slug):
print(f"retrying delete... ", end="", flush=True)
try:
response = self.session.delete(url)
response.raise_for_status()
return True
except requests.exceptions.RequestException as retry_e:
if hasattr(retry_e, 'response'):
print(f"FAILED - Error {retry_e.response.status_code} on retry")
else:
print(f"FAILED - Error on retry: {retry_e}")
return False
else:
return False
print(f"FAILED - Error {status_code} deleting {dataset_slug}")
try:
error_details = e.response.json()
if 'error' in error_details:
print(f" → {error_details['error']}")
except:
pass
else:
print(f"FAILED - Error deleting {dataset_slug}: {e}")
return False
def is_dataset_inactive(dataset: Dict, days: int) -> bool:
"""Check if a dataset is inactive based on last_written_at timestamp"""
last_written = dataset.get("last_written_at")
if not last_written:
# No last_written_at means no data was ever written
return True
try:
# Parse the timestamp (assuming ISO format)
last_written_dt = datetime.fromisoformat(last_written.replace('Z', '+00:00'))
cutoff_date = datetime.now(last_written_dt.tzinfo) - timedelta(days=days)
return last_written_dt < cutoff_date
except (ValueError, TypeError):
# If we can't parse the date, consider it inactive
print(f"Warning: Could not parse last_written_at for dataset {dataset.get('name', 'unknown')}")
return True
def format_date(date_str: str) -> str:
"""Format date string for display"""
if not date_str:
return "Never"
try:
dt = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
return dt.strftime('%Y-%m-%d')
except (ValueError, TypeError):
return date_str[:10] if len(date_str) >= 10 else date_str
def main():
parser = argparse.ArgumentParser(description="Clean up inactive Honeycomb datasets")
parser.add_argument("--days", type=int, default=60, help="Days to look back for activity (default: 60)")
parser.add_argument("--delete", action="store_true", help="Enable deletion mode")
parser.add_argument("--delete-protected", action="store_true", help="Also delete datasets with deletion protection enabled")
parser.add_argument("--name", "-n", action="append", help="Only consider datasets with these names for deletion (can be used multiple times)")
parser.add_argument("--api-key", type=str, help="Honeycomb API key (overrides env var)")
args = parser.parse_args()
# Get API key
api_key = args.api_key or os.getenv("HONEYCOMB_API_KEY")
if not api_key:
print("Error: HONEYCOMB_API_KEY environment variable not set and --api-key not provided")
print("Set it with: export HONEYCOMB_API_KEY=your_api_key_here")
sys.exit(1)
# Initialize client
client = HoneycombClient(api_key)
# Get environment info
env_info = client.get_environment_info()
env_name = env_info.get("name", "Unknown")
console.print(f"[bold blue]Honeycomb Environment:[/bold blue] [green]{env_name}[/green]")
console.print(f"[bold blue]Fetching datasets and checking for inactivity over {args.days} days...[/bold blue]")
# Get all datasets
datasets = client.get_datasets()
# Separate active and inactive datasets
inactive_datasets = []
active_datasets = []
filtered_out_datasets = []
for dataset in datasets:
dataset_name = dataset.get("name", "")
# If specific datasets are specified, only consider those
if args.name and dataset_name not in args.name:
filtered_out_datasets.append(dataset)
continue
if is_dataset_inactive(dataset, args.days):
inactive_datasets.append(dataset)
else:
active_datasets.append(dataset)
print(f"\nFound {len(active_datasets)} active datasets")
print(f"Found {len(inactive_datasets)} inactive datasets")
if args.name:
print(f"Filtered out {len(filtered_out_datasets)} datasets (not in specified list)")
if not inactive_datasets:
print("No inactive datasets found. Nothing to clean up!")
return
# Display inactive datasets in a table
table = Table(title=f"Datasets with no activity in the last {args.days} days")
table.add_column("Name", style="cyan", no_wrap=True)
table.add_column("Created", style="blue")
table.add_column("Last Activity", style="yellow")
table.add_column("Description", style="green")
for dataset in inactive_datasets:
name = dataset.get("name", "Unknown")
created = format_date(dataset.get("created_at", ""))
last_activity = format_date(dataset.get("last_written_at", ""))
description = dataset.get("description", "")
table.add_row(
name,
created,
last_activity,
description[:50] + "..." if len(description) > 50 else description
)
console.print(table)
if not args.delete:
print(f"\nTo delete these datasets, run: python {sys.argv[0]} --days {args.days} --delete")
return
# Deletion mode
console.print("[bold red]⚠️ WARNING: DELETION MODE ⚠️[/bold red]")
console.print("[bold red]This action cannot be undone![/bold red]")
# Final confirmation. we use Prompt.ask instead of Confirm so it's a bit more conscious action
if Prompt.ask(f"\nDo you want to delete {len(inactive_datasets)} inactive datasets?", default="no", choices=["yes I do", "no"]) == "no":
print("Aborted.")
return
# Delete datasets
print(f"\nDeleting {len(inactive_datasets)} datasets...")
deleted_count = 0
for dataset in inactive_datasets:
name = dataset.get("name", "Unknown")
slug = dataset.get("slug", "")
if not slug:
print(f"Skipping {name}: no slug found")
continue
print(f"Deleting {name}... ", end="", flush=True)
if client.delete_dataset(slug, args.delete_protected):
print("OK")
deleted_count += 1
print(f"\nDeleted {deleted_count} out of {len(inactive_datasets)} datasets.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment