Skip to content

Instantly share code, notes, and snippets.

@williamp44
Last active April 9, 2026 13:06
Show Gist options
  • Select an option

  • Save williamp44/58f359eafaa44481dbc0b71e86d2fe8e to your computer and use it in GitHub Desktop.

Select an option

Save williamp44/58f359eafaa44481dbc0b71e86d2fe8e to your computer and use it in GitHub Desktop.
Paperclip token & cost audit script — analyze agent token usage, idle waste, model costs

README.md

paperclip tool- paperclip_cost_audit.py

Instructions:

One-time setup

pip install psycopg2-binary

Run it

python paperclip_cost_audit.py

If Paperclip runs on a different port

python paperclip_cost_audit.py --api-url http://localhost:3200

If Postgres is on a different port

python paperclip_cost_audit.py --db-port 5432

If psycopg2 won't install (API-only mode, no cost data)

python paperclip_cost_audit.py --no-db

What the script reports:

  1. Cost by agent — total USD, runs, idle%, input/cache/output tokens, share of total
  2. Idle heartbeat waste — % of runs with no assigned task (your ESRM-Bot at 84% and C++ at 80% are the big wastes)
  3. Model distribution — which models are actually being used
  4. Savings opportunity — theoretical savings if switched to Haiku (upper bound)
  5. Routine health — which scheduled cron jobs are failing

Notable from our own output: 58% of all 1,261 runs were idle (no task assigned) — that's the single biggest optimization lever before touching models at all.

Run this and look at two things first:

  1. Model Distribution — if any agent is on Opus, that's your problem right there
  2. Top 15 Runs — look for zero or near-zero "Cache" column entries. If cache is consistently low, you're paying full input price every run. That's usually fixed by running agents more frequently or keeping the system prompt stable.

;;; more notes

wrote a quick script to analyze token usage against your local Paperclip instance. Shows per-agent cost breakdown, which runs are burning tokens with no assigned task, model distribution, and a savings estimate if you switch any agents to Haiku.

pip install psycopg2-binary curl -s https://gist.github.com/williamp44/58f359eafaa44481dbc0b71e86d2fe8e/raw | python3 -

Or download and run: curl -O https://gist.github.com/williamp44/58f359eafaa44481dbc0b71e86d2fe8e/raw/paperclip_cost_audit.py python paperclip_cost_audit.py

Defaults to localhost:3100 (Paperclip API) and localhost:54329 (Postgres). Let me know what you see — especially the cache hit % on your top runs. If that's near zero you're paying 10× more than needed.

#!/usr/bin/env python3
"""
paperclip_cost_audit.py — Standalone Paperclip token & cost analyzer
Connects to a local Paperclip instance and reports:
- Per-agent token usage and USD cost (from cost_events table)
- Idle heartbeat waste (runs with no assigned task)
- Model distribution across agents
- Routines that are firing but failing
Requirements:
pip install psycopg2-binary requests
Usage:
python paperclip_cost_audit.py
python paperclip_cost_audit.py --api-url http://localhost:3100
python paperclip_cost_audit.py --db-port 54329
Defaults match standard Paperclip local install.
"""
import argparse
import json
import sys
from urllib.request import urlopen
from urllib.error import URLError
# Optional: psycopg2 for direct Postgres access (more accurate)
try:
import psycopg2
import psycopg2.extras
HAS_PSYCOPG2 = True
except ImportError:
HAS_PSYCOPG2 = False
# ---------------------------------------------------------------------------
# Model pricing: USD per 1M tokens [input, cached-input, output]
# Updated 2026-04-08 — https://docs.anthropic.com/en/docs/about-claude/models
# ---------------------------------------------------------------------------
MODEL_PRICING = {
"claude-opus-4-6": (15.00, 1.50, 75.00),
"claude-sonnet-4-6": (3.00, 0.30, 15.00),
"claude-sonnet-4-5": (3.00, 0.30, 15.00),
"claude-haiku-4-5": (0.80, 0.08, 4.00),
"claude-haiku-4-5-20251001": (0.80, 0.08, 4.00),
"gpt-4o": (2.50, 1.25, 10.00),
"gpt-4o-mini": (0.15, 0.075, 0.60),
"gemini-2.0-flash": (0.10, 0.025, 0.40),
"gemini-1.5-pro": (1.25, 0.3125, 5.00),
}
def estimate_cost(model, input_tokens, output_tokens, cached_tokens=0):
pricing = MODEL_PRICING.get(model)
if not pricing:
return None
in_per_m, cached_per_m, out_per_m = pricing
in_cost = (input_tokens / 1_000_000) * in_per_m
cached_cost = (cached_tokens / 1_000_000) * cached_per_m
out_cost = (output_tokens / 1_000_000) * out_per_m
return {
"input_usd": round(in_cost, 2),
"cached_usd": round(cached_cost, 2),
"output_usd": round(out_cost, 2),
"total_usd": round(in_cost + cached_cost + out_cost, 2),
}
# ---------------------------------------------------------------------------
# HTTP API helpers
# ---------------------------------------------------------------------------
def api_get(base_url, path):
url = f"{base_url}/api/{path}"
try:
with urlopen(url, timeout=5) as resp:
return json.loads(resp.read())
except URLError as e:
print(f" ERROR: Could not reach {url}: {e}")
return None
def discover_company(base_url):
"""Get first company ID from the API."""
data = api_get(base_url, "companies")
if data and isinstance(data, list) and data:
return data[0]["id"]
# Try authenticated endpoint
data = api_get(base_url, "agents/me")
if data and "companyId" in data:
return data["companyId"]
return None
# ---------------------------------------------------------------------------
# Postgres cost_events query (accurate, per-run model data)
# ---------------------------------------------------------------------------
DB_QUERY = """
SELECT
a.name AS agent_name,
ce.model,
COUNT(*) AS runs,
SUM(ce.input_tokens) AS input_tokens,
SUM(ce.output_tokens) AS output_tokens,
SUM(ce.cached_input_tokens) AS cached_tokens,
-- idle runs: no issue attached (timer heartbeat with no task)
SUM(CASE WHEN ce.issue_id IS NULL THEN 1 ELSE 0 END) AS idle_runs,
SUM(CASE WHEN ce.issue_id IS NOT NULL THEN 1 ELSE 0 END) AS productive_runs
FROM cost_events ce
JOIN agents a ON a.id = ce.agent_id
WHERE ce.company_id = %s
GROUP BY a.name, ce.model
ORDER BY a.name, ce.model
"""
# Top expensive individual runs (outlier detection)
DB_QUERY_TOP_RUNS = """
SELECT
a.name AS agent_name,
ce.model,
ce.issue_id,
i.identifier AS issue_identifier,
i.title AS issue_title,
ce.input_tokens,
ce.output_tokens,
ce.cached_input_tokens AS cached_tokens,
ce.created_at
FROM cost_events ce
JOIN agents a ON a.id = ce.agent_id
LEFT JOIN issues i ON i.id = ce.issue_id
WHERE ce.company_id = %s
ORDER BY (ce.output_tokens + ce.input_tokens + ce.cached_input_tokens) DESC
LIMIT 15
"""
def query_cost_events(db_host, db_port, company_id):
if not HAS_PSYCOPG2:
return None, None, "psycopg2 not installed — run: pip install psycopg2-binary"
try:
conn = psycopg2.connect(
host=db_host, port=db_port,
dbname="paperclip", user="paperclip", password="paperclip",
connect_timeout=5
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(DB_QUERY, (company_id,))
rows = [dict(r) for r in cur.fetchall()]
cur.execute(DB_QUERY_TOP_RUNS, (company_id,))
top_runs = [dict(r) for r in cur.fetchall()]
cur.close()
conn.close()
return rows, top_runs, None
except Exception as e:
return None, None, str(e)
# ---------------------------------------------------------------------------
# Routines health check
# ---------------------------------------------------------------------------
def check_routines(base_url, company_id):
routines = api_get(base_url, f"companies/{company_id}/routines")
if not routines:
return []
results = []
for r in routines:
if r.get("status") == "archived":
continue
last_run = r.get("lastRun") or {}
last_result = ""
for t in r.get("triggers", []):
last_result = t.get("lastResult", "")
results.append({
"title": r["title"],
"assignee": r.get("assigneeAgentId", "unassigned")[:8] + "...",
"status": r.get("status"),
"last_result": last_result or last_run.get("status", "unknown"),
"failure": last_run.get("failureReason", ""),
})
return results
# ---------------------------------------------------------------------------
# Formatting
# ---------------------------------------------------------------------------
def fmt_tokens(n):
if n >= 1_000_000:
return f"{n/1_000_000:.1f}M"
if n >= 1_000:
return f"{n/1_000:.0f}K"
return str(n)
def fmt_usd(n):
return f"${n:.2f}"
def bar(ratio, width=20):
filled = round(ratio * width)
return "█" * filled + "░" * (width - filled)
# ---------------------------------------------------------------------------
# Report sections
# ---------------------------------------------------------------------------
def print_header(title):
print()
print("─" * 70)
print(f" {title}")
print("─" * 70)
def report_cost_by_agent(rows):
print_header("TOKEN COST BY AGENT (source: cost_events, Postgres)")
# Aggregate by agent (sum across models)
by_agent = {}
for r in rows:
name = r["agent_name"]
if name not in by_agent:
by_agent[name] = {
"runs": 0, "idle_runs": 0, "productive_runs": 0,
"input_tokens": 0, "output_tokens": 0, "cached_tokens": 0,
"total_usd": 0.0, "models": set()
}
a = by_agent[name]
a["runs"] += r["runs"]
a["idle_runs"] += r.get("idle_runs", 0)
a["productive_runs"] += r.get("productive_runs", 0)
a["input_tokens"] += r["input_tokens"] or 0
a["output_tokens"] += r["output_tokens"] or 0
a["cached_tokens"] += r["cached_tokens"] or 0
a["models"].add(r["model"])
est = estimate_cost(r["model"],
r["input_tokens"] or 0,
r["output_tokens"] or 0,
r["cached_tokens"] or 0)
if est:
a["total_usd"] += est["total_usd"]
total = sum(a["total_usd"] for a in by_agent.values())
# Sort by cost descending
sorted_agents = sorted(by_agent.items(), key=lambda x: x[1]["total_usd"], reverse=True)
print(f" {'Agent':<22} {'Runs':>6} {'Idle%':>6} {'In':>8} {'Cache':>8} {'Out':>8} {'Cost':>8} Share")
print(f" {'─'*22} {'─'*6} {'─'*6} {'─'*8} {'─'*8} {'─'*8} {'─'*8} {'─'*22}")
for name, a in sorted_agents:
runs = a["runs"]
idle_pct = (a["idle_runs"] / runs * 100) if runs else 0
share = a["total_usd"] / total if total else 0
models = ", ".join(a["models"])
print(f" {name:<22} {runs:>6,} {idle_pct:>5.0f}% "
f"{fmt_tokens(a['input_tokens']):>8} "
f"{fmt_tokens(a['cached_tokens']):>8} "
f"{fmt_tokens(a['output_tokens']):>8} "
f"{fmt_usd(a['total_usd']):>8} "
f"{bar(share)} {share*100:.0f}%")
print(f" {'':22} models: {models}")
print()
print(f" {'TOTAL':<22} {sum(a['runs'] for a in by_agent.values()):>6,}"
f"{'':>7} {'':>8} {'':>8} {'':>8} {fmt_usd(total):>8}")
def report_idle_waste(rows):
print_header("IDLE HEARTBEAT WASTE (runs with no assigned task)")
print(" Agents with a heartbeat schedule wake periodically even when there is nothing")
print(" to do. These 'idle' runs still consume cached tokens (~$0.50-2.00 each).")
print(" If you have no scheduled routines, idle% should be 0 — high idle% means")
print(" your heartbeat interval is too short relative to your issue volume.")
print()
total_runs = sum(r["runs"] for r in rows)
total_idle = sum(r.get("idle_runs", 0) or 0 for r in rows)
if total_runs == 0:
print(" No run data found.")
return
idle_pct = total_idle / total_runs * 100
print(f" Total runs: {total_runs:,}")
print(f" Idle (no task): {total_idle:,} ({idle_pct:.0f}%)")
print(f" Productive: {total_runs - total_idle:,} ({100-idle_pct:.0f}%)")
print()
print(f" Per agent:")
# Group idle by agent
by_agent_idle = {}
for r in rows:
name = r["agent_name"]
if name not in by_agent_idle:
by_agent_idle[name] = {"runs": 0, "idle": 0}
by_agent_idle[name]["runs"] += r["runs"]
by_agent_idle[name]["idle"] += r.get("idle_runs", 0) or 0
for name, a in sorted(by_agent_idle.items(), key=lambda x: x[1]["idle"], reverse=True):
pct = a["idle"] / a["runs"] * 100 if a["runs"] else 0
flag = " ← high waste" if pct > 50 else ""
print(f" {name:<22} {a['idle']:>5,} idle / {a['runs']:>5,} total ({pct:.0f}%){flag}")
def report_model_distribution(rows):
print_header("MODEL DISTRIBUTION")
model_totals = {}
for r in rows:
m = r["model"] or "unknown"
if m not in model_totals:
model_totals[m] = {"runs": 0, "agents": set()}
model_totals[m]["runs"] += r["runs"]
model_totals[m]["agents"].add(r["agent_name"])
total_runs = sum(v["runs"] for v in model_totals.values())
for model, v in sorted(model_totals.items(), key=lambda x: x[1]["runs"], reverse=True):
pct = v["runs"] / total_runs if total_runs else 0
pricing = MODEL_PRICING.get(model)
price_str = f" (${pricing[0]:.2f}/${pricing[2]:.2f} per 1M in/out)" if pricing else " (pricing unknown)"
print(f" {model:<35} {v['runs']:>5,} runs {bar(pct, 15)} {pct*100:.0f}%{price_str}")
print(f" agents: {', '.join(sorted(v['agents']))}")
def report_savings_opportunity(rows):
print_header("SAVINGS OPPORTUNITY (if switched to cheaper model)")
print(" Upper bound: what you'd spend if every agent used Haiku instead of Sonnet/Opus.")
print(" Engineers doing complex code generation need Sonnet. Triage/QA/routing agents")
print(" are good Haiku candidates. Treat this as a ceiling, not a target.")
print()
# Group by agent, calc current cost and Haiku equivalent
by_agent = {}
for r in rows:
name = r["agent_name"]
if name not in by_agent:
by_agent[name] = {"current_usd": 0.0, "in": 0, "out": 0, "cached": 0, "model": r["model"]}
a = by_agent[name]
a["in"] += r["input_tokens"] or 0
a["out"] += r["output_tokens"] or 0
a["cached"] += r["cached_tokens"] or 0
est = estimate_cost(r["model"],
r["input_tokens"] or 0,
r["output_tokens"] or 0,
r["cached_tokens"] or 0)
if est:
a["current_usd"] += est["total_usd"]
print(f" {'Agent':<22} {'Current':>10} {'→ Haiku':>10} {'Saving':>10} Note")
print(f" {'─'*22} {'─'*10} {'─'*10} {'─'*10} {'─'*20}")
total_saving = 0.0
for name, a in sorted(by_agent.items(), key=lambda x: x[1]["current_usd"], reverse=True):
haiku_est = estimate_cost("claude-haiku-4-5", a["in"], a["out"], a["cached"])
haiku_cost = haiku_est["total_usd"] if haiku_est else a["current_usd"]
saving = a["current_usd"] - haiku_cost
total_saving += saving
note = "already Haiku" if "haiku" in a["model"].lower() else (
"high saving" if saving > 10 else "")
print(f" {name:<22} {fmt_usd(a['current_usd']):>10} {fmt_usd(haiku_cost):>10} {fmt_usd(saving):>10} {note}")
print()
print(f" Max theoretical saving if all agents → Haiku: {fmt_usd(total_saving)}")
print(f" (Some agents need Sonnet for complex code tasks — this is an upper bound)")
def report_top_runs(top_runs):
print_header("TOP 15 MOST EXPENSIVE RUNS (outlier detection)")
print(" These are the individual runs that consumed the most tokens.")
print(" Look for: very high output (agent looping), low cache (prompt not stable),")
print(" or expensive runs on idle/no-task entries (heartbeat over-firing).")
print()
if not top_runs:
print(" No run data found.")
return
print(f" {'Agent':<22} {'Model':<12} {'In':>8} {'Cache':>8} {'Out':>8} {'Est$':>7} Issue")
print(f" {'─'*22} {'─'*12} {'─'*8} {'─'*8} {'─'*8} {'─'*7} {'─'*30}")
for r in top_runs:
in_tok = r.get("input_tokens") or 0
out_tok = r.get("output_tokens") or 0
ca_tok = r.get("cached_tokens") or 0
model = (r.get("model") or "unknown")[:12]
est = estimate_cost(r.get("model", ""), in_tok, out_tok, ca_tok)
cost = fmt_usd(est["total_usd"]) if est else " ?"
issue = r.get("issue_identifier") or "(no task — idle)"
title = r.get("issue_title") or ""
short_title = title[:35] + "…" if len(title) > 35 else title
issue_str = f"{issue}: {short_title}" if issue != "(no task — idle)" else issue
# Flag anomalies
flag = ""
if out_tok > 500_000:
flag = " ← very long output"
elif in_tok > 100_000 and ca_tok < in_tok * 0.5:
flag = " ← low cache hit"
print(f" {r.get('agent_name','?'):<22} {model:<12} "
f"{fmt_tokens(in_tok):>8} {fmt_tokens(ca_tok):>8} {fmt_tokens(out_tok):>8} "
f"{cost:>7} {issue_str}{flag}")
print()
print(" Flags:")
print(" 'very long output' — agent wrote a lot; check for loops or over-explanation")
print(" 'low cache hit' — context not being cached; first run or prompt varies too much")
def report_routines(routines):
print_header("SCHEDULED ROUTINES (cron jobs)")
print(" Routines are recurring tasks (health checks, QA gates, nightly audits).")
print(" Each routine fires on a schedule and wakes an agent. A failing routine")
print(" usually means the assigned agent is paused — check agent status above.")
print()
if not routines:
print(" No active routines found.")
print(" (This is normal if you haven't set any up yet.)")
return
for r in routines:
status_icon = "✓" if "success" in r["last_result"].lower() or "completed" in r["last_result"].lower() else "✗"
print(f" {status_icon} {r['title'][:55]:<55} [{r['last_result']}]")
if r["failure"]:
print(f" └─ {r['failure']}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Paperclip token & cost audit")
parser.add_argument("--api-url", default="http://localhost:3100", help="Paperclip API base URL")
parser.add_argument("--db-host", default="127.0.0.1", help="Postgres host")
parser.add_argument("--db-port", default=54329, type=int, help="Postgres port (default: 54329)")
parser.add_argument("--company", default=None, help="Company ID (auto-discovered if omitted)")
parser.add_argument("--no-db", action="store_true", help="Skip Postgres, API-only mode")
args = parser.parse_args()
print()
print("╔══════════════════════════════════════════════════════════════════╗")
print("║ PAPERCLIP COST & TOKEN AUDIT ║")
print("╚══════════════════════════════════════════════════════════════════╝")
print(f" API: {args.api_url}")
# Discover company ID
company_id = args.company
if not company_id:
print(" Discovering company ID from API...")
company_id = discover_company(args.api_url)
if not company_id:
print(" ERROR: Could not discover company ID. Pass --company <uuid> manually.")
sys.exit(1)
print(f" Company: {company_id}")
# Fetch cost data from Postgres
rows, top_runs = None, None
if not args.no_db:
if not HAS_PSYCOPG2:
print()
print(" ⚠ psycopg2 not installed. Install for full cost data:")
print(" pip install psycopg2-binary")
print(" Continuing with API-only mode (limited cost detail)...")
else:
print(f" Querying Postgres at {args.db_host}:{args.db_port}...")
rows, top_runs, err = query_cost_events(args.db_host, args.db_port, company_id)
if err:
print(f" ⚠ Postgres error: {err}")
print(" Continuing with API-only mode...")
rows = None
else:
print(f" Found {len(rows)} (agent, model) rows in cost_events.")
if rows:
report_cost_by_agent(rows)
report_top_runs(top_runs or [])
report_idle_waste(rows)
report_model_distribution(rows)
report_savings_opportunity(rows)
else:
# API-only fallback: agent list + status only
print_header("AGENTS (API-only mode — install psycopg2 for cost data)")
agents_data = api_get(args.api_url, f"companies/{company_id}/agents")
if agents_data:
for a in agents_data:
model = (a.get("adapterConfig") or {}).get("model", "unknown")
print(f" {a['name']:<25} {a['status']:<10} {model}")
else:
print(" Could not reach Paperclip API.")
# Routines health
routines = check_routines(args.api_url, company_id)
report_routines(routines)
print()
print("─" * 70)
print(" Done.")
print()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment