|
#!/usr/bin/env python3 |
|
""" |
|
paperclip_cost_audit.py — Standalone Paperclip token & cost analyzer |
|
|
|
Connects to a local Paperclip instance and reports: |
|
- Per-agent token usage and USD cost (from cost_events table) |
|
- Idle heartbeat waste (runs with no assigned task) |
|
- Model distribution across agents |
|
- Routines that are firing but failing |
|
|
|
Requirements: |
|
pip install psycopg2-binary requests |
|
|
|
Usage: |
|
python paperclip_cost_audit.py |
|
python paperclip_cost_audit.py --api-url http://localhost:3100 |
|
python paperclip_cost_audit.py --db-port 54329 |
|
|
|
Defaults match standard Paperclip local install. |
|
""" |
|
|
|
import argparse |
|
import json |
|
import sys |
|
from urllib.request import urlopen |
|
from urllib.error import URLError |
|
|
|
# Optional: psycopg2 for direct Postgres access (more accurate) |
|
try: |
|
import psycopg2 |
|
import psycopg2.extras |
|
HAS_PSYCOPG2 = True |
|
except ImportError: |
|
HAS_PSYCOPG2 = False |
|
|
|
# --------------------------------------------------------------------------- |
|
# Model pricing: USD per 1M tokens [input, cached-input, output] |
|
# Updated 2026-04-08 — https://docs.anthropic.com/en/docs/about-claude/models |
|
# --------------------------------------------------------------------------- |
|
MODEL_PRICING = { |
|
"claude-opus-4-6": (15.00, 1.50, 75.00), |
|
"claude-sonnet-4-6": (3.00, 0.30, 15.00), |
|
"claude-sonnet-4-5": (3.00, 0.30, 15.00), |
|
"claude-haiku-4-5": (0.80, 0.08, 4.00), |
|
"claude-haiku-4-5-20251001": (0.80, 0.08, 4.00), |
|
"gpt-4o": (2.50, 1.25, 10.00), |
|
"gpt-4o-mini": (0.15, 0.075, 0.60), |
|
"gemini-2.0-flash": (0.10, 0.025, 0.40), |
|
"gemini-1.5-pro": (1.25, 0.3125, 5.00), |
|
} |
|
|
|
def estimate_cost(model, input_tokens, output_tokens, cached_tokens=0): |
|
pricing = MODEL_PRICING.get(model) |
|
if not pricing: |
|
return None |
|
in_per_m, cached_per_m, out_per_m = pricing |
|
in_cost = (input_tokens / 1_000_000) * in_per_m |
|
cached_cost = (cached_tokens / 1_000_000) * cached_per_m |
|
out_cost = (output_tokens / 1_000_000) * out_per_m |
|
return { |
|
"input_usd": round(in_cost, 2), |
|
"cached_usd": round(cached_cost, 2), |
|
"output_usd": round(out_cost, 2), |
|
"total_usd": round(in_cost + cached_cost + out_cost, 2), |
|
} |
|
|
|
# --------------------------------------------------------------------------- |
|
# HTTP API helpers |
|
# --------------------------------------------------------------------------- |
|
def api_get(base_url, path): |
|
url = f"{base_url}/api/{path}" |
|
try: |
|
with urlopen(url, timeout=5) as resp: |
|
return json.loads(resp.read()) |
|
except URLError as e: |
|
print(f" ERROR: Could not reach {url}: {e}") |
|
return None |
|
|
|
def discover_company(base_url): |
|
"""Get first company ID from the API.""" |
|
data = api_get(base_url, "companies") |
|
if data and isinstance(data, list) and data: |
|
return data[0]["id"] |
|
# Try authenticated endpoint |
|
data = api_get(base_url, "agents/me") |
|
if data and "companyId" in data: |
|
return data["companyId"] |
|
return None |
|
|
|
# --------------------------------------------------------------------------- |
|
# Postgres cost_events query (accurate, per-run model data) |
|
# --------------------------------------------------------------------------- |
|
DB_QUERY = """ |
|
SELECT |
|
a.name AS agent_name, |
|
ce.model, |
|
COUNT(*) AS runs, |
|
SUM(ce.input_tokens) AS input_tokens, |
|
SUM(ce.output_tokens) AS output_tokens, |
|
SUM(ce.cached_input_tokens) AS cached_tokens, |
|
-- idle runs: no issue attached (timer heartbeat with no task) |
|
SUM(CASE WHEN ce.issue_id IS NULL THEN 1 ELSE 0 END) AS idle_runs, |
|
SUM(CASE WHEN ce.issue_id IS NOT NULL THEN 1 ELSE 0 END) AS productive_runs |
|
FROM cost_events ce |
|
JOIN agents a ON a.id = ce.agent_id |
|
WHERE ce.company_id = %s |
|
GROUP BY a.name, ce.model |
|
ORDER BY a.name, ce.model |
|
""" |
|
|
|
# Top expensive individual runs (outlier detection) |
|
DB_QUERY_TOP_RUNS = """ |
|
SELECT |
|
a.name AS agent_name, |
|
ce.model, |
|
ce.issue_id, |
|
i.identifier AS issue_identifier, |
|
i.title AS issue_title, |
|
ce.input_tokens, |
|
ce.output_tokens, |
|
ce.cached_input_tokens AS cached_tokens, |
|
ce.created_at |
|
FROM cost_events ce |
|
JOIN agents a ON a.id = ce.agent_id |
|
LEFT JOIN issues i ON i.id = ce.issue_id |
|
WHERE ce.company_id = %s |
|
ORDER BY (ce.output_tokens + ce.input_tokens + ce.cached_input_tokens) DESC |
|
LIMIT 15 |
|
""" |
|
|
|
def query_cost_events(db_host, db_port, company_id): |
|
if not HAS_PSYCOPG2: |
|
return None, None, "psycopg2 not installed — run: pip install psycopg2-binary" |
|
try: |
|
conn = psycopg2.connect( |
|
host=db_host, port=db_port, |
|
dbname="paperclip", user="paperclip", password="paperclip", |
|
connect_timeout=5 |
|
) |
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) |
|
cur.execute(DB_QUERY, (company_id,)) |
|
rows = [dict(r) for r in cur.fetchall()] |
|
cur.execute(DB_QUERY_TOP_RUNS, (company_id,)) |
|
top_runs = [dict(r) for r in cur.fetchall()] |
|
cur.close() |
|
conn.close() |
|
return rows, top_runs, None |
|
except Exception as e: |
|
return None, None, str(e) |
|
|
|
# --------------------------------------------------------------------------- |
|
# Routines health check |
|
# --------------------------------------------------------------------------- |
|
def check_routines(base_url, company_id): |
|
routines = api_get(base_url, f"companies/{company_id}/routines") |
|
if not routines: |
|
return [] |
|
results = [] |
|
for r in routines: |
|
if r.get("status") == "archived": |
|
continue |
|
last_run = r.get("lastRun") or {} |
|
last_result = "" |
|
for t in r.get("triggers", []): |
|
last_result = t.get("lastResult", "") |
|
results.append({ |
|
"title": r["title"], |
|
"assignee": r.get("assigneeAgentId", "unassigned")[:8] + "...", |
|
"status": r.get("status"), |
|
"last_result": last_result or last_run.get("status", "unknown"), |
|
"failure": last_run.get("failureReason", ""), |
|
}) |
|
return results |
|
|
|
# --------------------------------------------------------------------------- |
|
# Formatting |
|
# --------------------------------------------------------------------------- |
|
def fmt_tokens(n): |
|
if n >= 1_000_000: |
|
return f"{n/1_000_000:.1f}M" |
|
if n >= 1_000: |
|
return f"{n/1_000:.0f}K" |
|
return str(n) |
|
|
|
def fmt_usd(n): |
|
return f"${n:.2f}" |
|
|
|
def bar(ratio, width=20): |
|
filled = round(ratio * width) |
|
return "█" * filled + "░" * (width - filled) |
|
|
|
# --------------------------------------------------------------------------- |
|
# Report sections |
|
# --------------------------------------------------------------------------- |
|
def print_header(title): |
|
print() |
|
print("─" * 70) |
|
print(f" {title}") |
|
print("─" * 70) |
|
|
|
def report_cost_by_agent(rows): |
|
print_header("TOKEN COST BY AGENT (source: cost_events, Postgres)") |
|
|
|
# Aggregate by agent (sum across models) |
|
by_agent = {} |
|
for r in rows: |
|
name = r["agent_name"] |
|
if name not in by_agent: |
|
by_agent[name] = { |
|
"runs": 0, "idle_runs": 0, "productive_runs": 0, |
|
"input_tokens": 0, "output_tokens": 0, "cached_tokens": 0, |
|
"total_usd": 0.0, "models": set() |
|
} |
|
a = by_agent[name] |
|
a["runs"] += r["runs"] |
|
a["idle_runs"] += r.get("idle_runs", 0) |
|
a["productive_runs"] += r.get("productive_runs", 0) |
|
a["input_tokens"] += r["input_tokens"] or 0 |
|
a["output_tokens"] += r["output_tokens"] or 0 |
|
a["cached_tokens"] += r["cached_tokens"] or 0 |
|
a["models"].add(r["model"]) |
|
est = estimate_cost(r["model"], |
|
r["input_tokens"] or 0, |
|
r["output_tokens"] or 0, |
|
r["cached_tokens"] or 0) |
|
if est: |
|
a["total_usd"] += est["total_usd"] |
|
|
|
total = sum(a["total_usd"] for a in by_agent.values()) |
|
|
|
# Sort by cost descending |
|
sorted_agents = sorted(by_agent.items(), key=lambda x: x[1]["total_usd"], reverse=True) |
|
|
|
print(f" {'Agent':<22} {'Runs':>6} {'Idle%':>6} {'In':>8} {'Cache':>8} {'Out':>8} {'Cost':>8} Share") |
|
print(f" {'─'*22} {'─'*6} {'─'*6} {'─'*8} {'─'*8} {'─'*8} {'─'*8} {'─'*22}") |
|
|
|
for name, a in sorted_agents: |
|
runs = a["runs"] |
|
idle_pct = (a["idle_runs"] / runs * 100) if runs else 0 |
|
share = a["total_usd"] / total if total else 0 |
|
models = ", ".join(a["models"]) |
|
print(f" {name:<22} {runs:>6,} {idle_pct:>5.0f}% " |
|
f"{fmt_tokens(a['input_tokens']):>8} " |
|
f"{fmt_tokens(a['cached_tokens']):>8} " |
|
f"{fmt_tokens(a['output_tokens']):>8} " |
|
f"{fmt_usd(a['total_usd']):>8} " |
|
f"{bar(share)} {share*100:.0f}%") |
|
print(f" {'':22} models: {models}") |
|
|
|
print() |
|
print(f" {'TOTAL':<22} {sum(a['runs'] for a in by_agent.values()):>6,}" |
|
f"{'':>7} {'':>8} {'':>8} {'':>8} {fmt_usd(total):>8}") |
|
|
|
def report_idle_waste(rows): |
|
print_header("IDLE HEARTBEAT WASTE (runs with no assigned task)") |
|
print(" Agents with a heartbeat schedule wake periodically even when there is nothing") |
|
print(" to do. These 'idle' runs still consume cached tokens (~$0.50-2.00 each).") |
|
print(" If you have no scheduled routines, idle% should be 0 — high idle% means") |
|
print(" your heartbeat interval is too short relative to your issue volume.") |
|
print() |
|
|
|
total_runs = sum(r["runs"] for r in rows) |
|
total_idle = sum(r.get("idle_runs", 0) or 0 for r in rows) |
|
|
|
if total_runs == 0: |
|
print(" No run data found.") |
|
return |
|
|
|
idle_pct = total_idle / total_runs * 100 |
|
|
|
print(f" Total runs: {total_runs:,}") |
|
print(f" Idle (no task): {total_idle:,} ({idle_pct:.0f}%)") |
|
print(f" Productive: {total_runs - total_idle:,} ({100-idle_pct:.0f}%)") |
|
print() |
|
print(f" Per agent:") |
|
|
|
# Group idle by agent |
|
by_agent_idle = {} |
|
for r in rows: |
|
name = r["agent_name"] |
|
if name not in by_agent_idle: |
|
by_agent_idle[name] = {"runs": 0, "idle": 0} |
|
by_agent_idle[name]["runs"] += r["runs"] |
|
by_agent_idle[name]["idle"] += r.get("idle_runs", 0) or 0 |
|
|
|
for name, a in sorted(by_agent_idle.items(), key=lambda x: x[1]["idle"], reverse=True): |
|
pct = a["idle"] / a["runs"] * 100 if a["runs"] else 0 |
|
flag = " ← high waste" if pct > 50 else "" |
|
print(f" {name:<22} {a['idle']:>5,} idle / {a['runs']:>5,} total ({pct:.0f}%){flag}") |
|
|
|
def report_model_distribution(rows): |
|
print_header("MODEL DISTRIBUTION") |
|
|
|
model_totals = {} |
|
for r in rows: |
|
m = r["model"] or "unknown" |
|
if m not in model_totals: |
|
model_totals[m] = {"runs": 0, "agents": set()} |
|
model_totals[m]["runs"] += r["runs"] |
|
model_totals[m]["agents"].add(r["agent_name"]) |
|
|
|
total_runs = sum(v["runs"] for v in model_totals.values()) |
|
|
|
for model, v in sorted(model_totals.items(), key=lambda x: x[1]["runs"], reverse=True): |
|
pct = v["runs"] / total_runs if total_runs else 0 |
|
pricing = MODEL_PRICING.get(model) |
|
price_str = f" (${pricing[0]:.2f}/${pricing[2]:.2f} per 1M in/out)" if pricing else " (pricing unknown)" |
|
print(f" {model:<35} {v['runs']:>5,} runs {bar(pct, 15)} {pct*100:.0f}%{price_str}") |
|
print(f" agents: {', '.join(sorted(v['agents']))}") |
|
|
|
def report_savings_opportunity(rows): |
|
print_header("SAVINGS OPPORTUNITY (if switched to cheaper model)") |
|
print(" Upper bound: what you'd spend if every agent used Haiku instead of Sonnet/Opus.") |
|
print(" Engineers doing complex code generation need Sonnet. Triage/QA/routing agents") |
|
print(" are good Haiku candidates. Treat this as a ceiling, not a target.") |
|
print() |
|
|
|
# Group by agent, calc current cost and Haiku equivalent |
|
by_agent = {} |
|
for r in rows: |
|
name = r["agent_name"] |
|
if name not in by_agent: |
|
by_agent[name] = {"current_usd": 0.0, "in": 0, "out": 0, "cached": 0, "model": r["model"]} |
|
a = by_agent[name] |
|
a["in"] += r["input_tokens"] or 0 |
|
a["out"] += r["output_tokens"] or 0 |
|
a["cached"] += r["cached_tokens"] or 0 |
|
est = estimate_cost(r["model"], |
|
r["input_tokens"] or 0, |
|
r["output_tokens"] or 0, |
|
r["cached_tokens"] or 0) |
|
if est: |
|
a["current_usd"] += est["total_usd"] |
|
|
|
print(f" {'Agent':<22} {'Current':>10} {'→ Haiku':>10} {'Saving':>10} Note") |
|
print(f" {'─'*22} {'─'*10} {'─'*10} {'─'*10} {'─'*20}") |
|
|
|
total_saving = 0.0 |
|
for name, a in sorted(by_agent.items(), key=lambda x: x[1]["current_usd"], reverse=True): |
|
haiku_est = estimate_cost("claude-haiku-4-5", a["in"], a["out"], a["cached"]) |
|
haiku_cost = haiku_est["total_usd"] if haiku_est else a["current_usd"] |
|
saving = a["current_usd"] - haiku_cost |
|
total_saving += saving |
|
note = "already Haiku" if "haiku" in a["model"].lower() else ( |
|
"high saving" if saving > 10 else "") |
|
print(f" {name:<22} {fmt_usd(a['current_usd']):>10} {fmt_usd(haiku_cost):>10} {fmt_usd(saving):>10} {note}") |
|
|
|
print() |
|
print(f" Max theoretical saving if all agents → Haiku: {fmt_usd(total_saving)}") |
|
print(f" (Some agents need Sonnet for complex code tasks — this is an upper bound)") |
|
|
|
def report_top_runs(top_runs): |
|
print_header("TOP 15 MOST EXPENSIVE RUNS (outlier detection)") |
|
print(" These are the individual runs that consumed the most tokens.") |
|
print(" Look for: very high output (agent looping), low cache (prompt not stable),") |
|
print(" or expensive runs on idle/no-task entries (heartbeat over-firing).") |
|
print() |
|
if not top_runs: |
|
print(" No run data found.") |
|
return |
|
|
|
print(f" {'Agent':<22} {'Model':<12} {'In':>8} {'Cache':>8} {'Out':>8} {'Est$':>7} Issue") |
|
print(f" {'─'*22} {'─'*12} {'─'*8} {'─'*8} {'─'*8} {'─'*7} {'─'*30}") |
|
|
|
for r in top_runs: |
|
in_tok = r.get("input_tokens") or 0 |
|
out_tok = r.get("output_tokens") or 0 |
|
ca_tok = r.get("cached_tokens") or 0 |
|
model = (r.get("model") or "unknown")[:12] |
|
est = estimate_cost(r.get("model", ""), in_tok, out_tok, ca_tok) |
|
cost = fmt_usd(est["total_usd"]) if est else " ?" |
|
issue = r.get("issue_identifier") or "(no task — idle)" |
|
title = r.get("issue_title") or "" |
|
short_title = title[:35] + "…" if len(title) > 35 else title |
|
issue_str = f"{issue}: {short_title}" if issue != "(no task — idle)" else issue |
|
|
|
# Flag anomalies |
|
flag = "" |
|
if out_tok > 500_000: |
|
flag = " ← very long output" |
|
elif in_tok > 100_000 and ca_tok < in_tok * 0.5: |
|
flag = " ← low cache hit" |
|
|
|
print(f" {r.get('agent_name','?'):<22} {model:<12} " |
|
f"{fmt_tokens(in_tok):>8} {fmt_tokens(ca_tok):>8} {fmt_tokens(out_tok):>8} " |
|
f"{cost:>7} {issue_str}{flag}") |
|
|
|
print() |
|
print(" Flags:") |
|
print(" 'very long output' — agent wrote a lot; check for loops or over-explanation") |
|
print(" 'low cache hit' — context not being cached; first run or prompt varies too much") |
|
|
|
|
|
def report_routines(routines): |
|
print_header("SCHEDULED ROUTINES (cron jobs)") |
|
print(" Routines are recurring tasks (health checks, QA gates, nightly audits).") |
|
print(" Each routine fires on a schedule and wakes an agent. A failing routine") |
|
print(" usually means the assigned agent is paused — check agent status above.") |
|
print() |
|
if not routines: |
|
print(" No active routines found.") |
|
print(" (This is normal if you haven't set any up yet.)") |
|
return |
|
|
|
for r in routines: |
|
status_icon = "✓" if "success" in r["last_result"].lower() or "completed" in r["last_result"].lower() else "✗" |
|
print(f" {status_icon} {r['title'][:55]:<55} [{r['last_result']}]") |
|
if r["failure"]: |
|
print(f" └─ {r['failure']}") |
|
|
|
# --------------------------------------------------------------------------- |
|
# Main |
|
# --------------------------------------------------------------------------- |
|
def main(): |
|
parser = argparse.ArgumentParser(description="Paperclip token & cost audit") |
|
parser.add_argument("--api-url", default="http://localhost:3100", help="Paperclip API base URL") |
|
parser.add_argument("--db-host", default="127.0.0.1", help="Postgres host") |
|
parser.add_argument("--db-port", default=54329, type=int, help="Postgres port (default: 54329)") |
|
parser.add_argument("--company", default=None, help="Company ID (auto-discovered if omitted)") |
|
parser.add_argument("--no-db", action="store_true", help="Skip Postgres, API-only mode") |
|
args = parser.parse_args() |
|
|
|
print() |
|
print("╔══════════════════════════════════════════════════════════════════╗") |
|
print("║ PAPERCLIP COST & TOKEN AUDIT ║") |
|
print("╚══════════════════════════════════════════════════════════════════╝") |
|
print(f" API: {args.api_url}") |
|
|
|
# Discover company ID |
|
company_id = args.company |
|
if not company_id: |
|
print(" Discovering company ID from API...") |
|
company_id = discover_company(args.api_url) |
|
if not company_id: |
|
print(" ERROR: Could not discover company ID. Pass --company <uuid> manually.") |
|
sys.exit(1) |
|
print(f" Company: {company_id}") |
|
|
|
# Fetch cost data from Postgres |
|
rows, top_runs = None, None |
|
if not args.no_db: |
|
if not HAS_PSYCOPG2: |
|
print() |
|
print(" ⚠ psycopg2 not installed. Install for full cost data:") |
|
print(" pip install psycopg2-binary") |
|
print(" Continuing with API-only mode (limited cost detail)...") |
|
else: |
|
print(f" Querying Postgres at {args.db_host}:{args.db_port}...") |
|
rows, top_runs, err = query_cost_events(args.db_host, args.db_port, company_id) |
|
if err: |
|
print(f" ⚠ Postgres error: {err}") |
|
print(" Continuing with API-only mode...") |
|
rows = None |
|
else: |
|
print(f" Found {len(rows)} (agent, model) rows in cost_events.") |
|
|
|
if rows: |
|
report_cost_by_agent(rows) |
|
report_top_runs(top_runs or []) |
|
report_idle_waste(rows) |
|
report_model_distribution(rows) |
|
report_savings_opportunity(rows) |
|
else: |
|
# API-only fallback: agent list + status only |
|
print_header("AGENTS (API-only mode — install psycopg2 for cost data)") |
|
agents_data = api_get(args.api_url, f"companies/{company_id}/agents") |
|
if agents_data: |
|
for a in agents_data: |
|
model = (a.get("adapterConfig") or {}).get("model", "unknown") |
|
print(f" {a['name']:<25} {a['status']:<10} {model}") |
|
else: |
|
print(" Could not reach Paperclip API.") |
|
|
|
# Routines health |
|
routines = check_routines(args.api_url, company_id) |
|
report_routines(routines) |
|
|
|
print() |
|
print("─" * 70) |
|
print(" Done.") |
|
print() |
|
|
|
if __name__ == "__main__": |
|
main() |