Created
July 30, 2025 23:19
-
-
Save bdmorin/7859867e791b11307fd97e7e17abfe70 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import marimo | |
__generated_with = "0.14.15" | |
app = marimo.App(width="medium") | |
@app.cell | |
def __(): | |
import marimo as mo | |
import json | |
import pandas as pd | |
from collections import defaultdict | |
from typing import Dict, Any, List, Tuple | |
import yaml | |
import altair as alt | |
import warnings | |
import urllib.request | |
# Suppress pyarrow warning from altair | |
warnings.filterwarnings("ignore", message="Missing optional dependency 'pyarrow'") | |
return Any, Dict, List, Tuple, alt, defaultdict, json, mo, pd, urllib, warnings, yaml | |
@app.cell | |
def __(mo): | |
mo.md( | |
r""" | |
# LLM Token Analysis & Cost Calculator | |
Interactive analysis of LLM models with token limits and pricing scenarios. | |
""" | |
) | |
return | |
@app.cell | |
def __(json, mo, urllib): | |
# Load the full model data from litellm repository | |
_url = "https://raw.githubusercontent.com/BerriAI/litellm/refs/heads/main/model_prices_and_context_window.json" | |
try: | |
with urllib.request.urlopen(_url) as response: | |
_full_data = json.loads(response.read().decode()) | |
# Remove the sample_spec entry | |
models_data = {k: v for k, v in _full_data.items() if k != "sample_spec"} | |
mo.md(f"✅ **Successfully loaded {len(models_data)} models from litellm repository**") | |
except Exception as e: | |
mo.md(f"❌ **Failed to load data from URL:** {str(e)}") | |
# Fallback to sample data | |
_sample_data = { | |
"gpt-4": { | |
"max_tokens": 4096, | |
"max_input_tokens": 8192, | |
"max_output_tokens": 4096, | |
"input_cost_per_token": 3e-05, | |
"output_cost_per_token": 6e-05, | |
"litellm_provider": "openai", | |
"mode": "chat" | |
}, | |
"gpt-4o": { | |
"max_tokens": 16384, | |
"max_input_tokens": 128000, | |
"max_output_tokens": 16384, | |
"input_cost_per_token": 2.5e-06, | |
"output_cost_per_token": 1e-05, | |
"litellm_provider": "openai", | |
"mode": "chat" | |
}, | |
"claude-3-5-sonnet-latest": { | |
"max_tokens": 8192, | |
"max_input_tokens": 200000, | |
"max_output_tokens": 8192, | |
"input_cost_per_token": 3e-06, | |
"output_cost_per_token": 1.5e-05, | |
"litellm_provider": "anthropic", | |
"mode": "chat" | |
}, | |
"gemini-1.5-pro": { | |
"max_tokens": 8192, | |
"max_input_tokens": 2097152, | |
"max_output_tokens": 8192, | |
"input_cost_per_token": 1.25e-06, | |
"output_cost_per_token": 5e-06, | |
"litellm_provider": "vertex_ai-language-models", | |
"mode": "chat" | |
} | |
} | |
models_data = _sample_data | |
mo.md(f"⚠️ **Using fallback sample data with {len(models_data)} models**") | |
return (models_data,) | |
@app.cell | |
def __(Any, Dict): | |
def get_max_tokens(model_data: Dict[str, Any]) -> int: | |
"""Extract the maximum token count from model data.""" | |
token_fields = ['max_input_tokens', 'max_tokens', 'max_output_tokens'] | |
for field in token_fields: | |
if field in model_data and isinstance(model_data[field], int): | |
return model_data[field] | |
return 0 | |
def get_developer(model_name: str, model_data: Dict[str, Any]) -> str: | |
"""Extract the developer/provider from model data.""" | |
provider = model_data.get('litellm_provider', '') | |
provider_map = { | |
'openai': 'OpenAI', | |
'anthropic': 'Anthropic', | |
'vertex_ai-language-models': 'Google', | |
'vertex_ai-vision-models': 'Google', | |
'vertex_ai-text-models': 'Google', | |
'vertex_ai-chat-models': 'Google', | |
'vertex_ai-code-text-models': 'Google', | |
'vertex_ai-code-chat-models': 'Google', | |
'mistral': 'Mistral', | |
'deepseek': 'DeepSeek', | |
'xai': 'xAI', | |
'groq': 'Groq', | |
'cerebras': 'Cerebras', | |
'azure': 'Microsoft', | |
'azure_ai': 'Microsoft', | |
'meta_llama': 'Meta', | |
'friendliai': 'FriendliAI', | |
'codestral': 'Mistral', | |
'text-completion-codestral': 'Mistral', | |
'text-completion-openai': 'OpenAI', | |
'azure_text': 'Microsoft', | |
'watsonx': 'IBM' | |
} | |
if provider in provider_map: | |
return provider_map[provider] | |
# Try to infer from model name prefix | |
if model_name.startswith('gpt-') or model_name.startswith('o1') or model_name.startswith('o3') or model_name.startswith('o4'): | |
return 'OpenAI' | |
elif model_name.startswith('claude-'): | |
return 'Anthropic' | |
elif model_name.startswith('gemini-'): | |
return 'Google' | |
elif 'mistral' in model_name.lower(): | |
return 'Mistral' | |
elif 'deepseek' in model_name.lower(): | |
return 'DeepSeek' | |
elif 'grok' in model_name.lower(): | |
return 'xAI' | |
elif 'llama' in model_name.lower(): | |
return 'Meta' | |
return 'Unknown' | |
def categorize_tokens(max_tokens: int) -> str: | |
"""Categorize token counts into size groups.""" | |
if max_tokens == 0: | |
return "No Token Info" | |
elif max_tokens <= 1000: | |
return "Tiny (≤1K)" | |
elif max_tokens <= 4000: | |
return "Small (1K-4K)" | |
elif max_tokens <= 8000: | |
return "Medium (4K-8K)" | |
elif max_tokens <= 16000: | |
return "Large (8K-16K)" | |
elif max_tokens <= 32000: | |
return "XL (16K-32K)" | |
elif max_tokens <= 64000: | |
return "XXL (32K-64K)" | |
elif max_tokens <= 128000: | |
return "XXXL (64K-128K)" | |
elif max_tokens <= 500000: | |
return "Massive (128K-500K)" | |
elif max_tokens <= 2000000: | |
return "Ultra (500K-2M)" | |
else: | |
return "Extreme (>2M)" | |
def calculate_cost_scenario(model_data: Dict[str, Any]) -> Dict[str, float]: | |
"""Calculate cost for 3/4 input + 1/4 output scenario with output floor.""" | |
max_input = model_data.get('max_input_tokens', 0) | |
max_output = model_data.get('max_output_tokens', 0) | |
input_cost = model_data.get('input_cost_per_token', 0) | |
output_cost = model_data.get('output_cost_per_token', 0) | |
if not all([max_input, max_output, input_cost, output_cost]): | |
return {"input_tokens": 0, "output_tokens": 0, "input_cost": 0, "output_cost": 0, "total_cost": 0} | |
# 3/4 of max input tokens | |
input_tokens = int(max_input * 0.75) | |
# 1/4 of max output tokens with floor for large models | |
output_tokens_raw = int(max_output * 0.25) | |
# Apply floor: 2500 tokens for models with >50K input capacity | |
if max_input > 50000 and output_tokens_raw < 2500: | |
output_tokens = 2500 | |
else: | |
output_tokens = output_tokens_raw | |
input_cost_total = input_tokens * input_cost | |
output_cost_total = output_tokens * output_cost | |
total_cost = input_cost_total + output_cost_total | |
return { | |
"input_tokens": input_tokens, | |
"output_tokens": output_tokens, | |
"input_cost": input_cost_total, | |
"output_cost": output_cost_total, | |
"total_cost": total_cost | |
} | |
return ( | |
calculate_cost_scenario, | |
categorize_tokens, | |
get_developer, | |
get_max_tokens, | |
) | |
@app.cell | |
def __( | |
calculate_cost_scenario, | |
categorize_tokens, | |
get_developer, | |
get_max_tokens, | |
models_data, | |
pd, | |
): | |
# Process the data | |
_processed_models = [] | |
for _model_name, _model_data in models_data.items(): | |
if not isinstance(_model_data, dict): | |
continue | |
_max_tokens = get_max_tokens(_model_data) | |
_category = categorize_tokens(_max_tokens) | |
_developer = get_developer(_model_name, _model_data) | |
_cost_scenario = calculate_cost_scenario(_model_data) | |
_processed_models.append({ | |
'model_name': _model_name, | |
'developer': _developer, | |
'max_input_tokens': _model_data.get('max_input_tokens', 0), | |
'max_output_tokens': _model_data.get('max_output_tokens', 0), | |
'category': _category, | |
'input_cost_per_token': _model_data.get('input_cost_per_token', 0), | |
'output_cost_per_token': _model_data.get('output_cost_per_token', 0), | |
**_cost_scenario | |
}) | |
df = pd.DataFrame(_processed_models) | |
df = df.sort_values('max_input_tokens', ascending=False) | |
return (df,) | |
@app.cell | |
def __(df, mo): | |
mo.md( | |
f""" | |
## Dataset Overview | |
**Total Models:** {len(df)} | |
**Developers:** {', '.join(df['developer'].unique())} | |
""" | |
) | |
return | |
@app.cell | |
def __(df): | |
# Provider breakdown by category | |
provider_breakdown = df.groupby(['category', 'developer']).size().unstack(fill_value=0) | |
return (provider_breakdown,) | |
@app.cell | |
def __(mo): | |
mo.md("## Token Size Distribution by Provider") | |
return | |
@app.cell | |
def __(mo, provider_breakdown): | |
# Display as a nice table | |
mo.ui.table( | |
provider_breakdown.reset_index(), | |
selection=None | |
) | |
return | |
@app.cell | |
def __(df): | |
# Cost scenario analysis | |
cost_df = df[df['total_cost'] > 0].copy() | |
cost_df['cost_per_1k_scenario'] = cost_df['total_cost'] * 1000 | |
return (cost_df,) | |
@app.cell | |
def __(cost_df, mo): | |
mo.md(f""" | |
## Cost Scenario Analysis | |
**Scenario:** User submits 75% of max input tokens, model responds with 25% of max output tokens | |
**Output Floor:** Large models (>50K input) get minimum 2,500 output tokens | |
**Models with pricing data:** {len(cost_df)} | |
""") | |
return | |
@app.cell | |
def __(cost_df): | |
# Show cost analysis table | |
_display_cols = [ | |
'model_name', 'developer', 'input_tokens', 'output_tokens', | |
'total_cost', 'cost_per_1k_scenario' | |
] | |
cost_display = cost_df[_display_cols].copy() | |
cost_display['total_cost'] = cost_display['total_cost'].apply(lambda x: f"${x:.6f}") | |
cost_display['cost_per_1k_scenario'] = cost_display['cost_per_1k_scenario'].apply(lambda x: f"${x:.4f}") | |
return (cost_display,) | |
@app.cell | |
def __(cost_display, mo): | |
mo.ui.table( | |
cost_display, | |
selection=None | |
) | |
return | |
@app.cell | |
def __(mo): | |
# Interactive provider filter - now with all the providers from the full dataset | |
provider_filter = mo.ui.multiselect( | |
options=[ | |
"OpenAI", "Anthropic", "Google", "Mistral", "DeepSeek", "xAI", | |
"Groq", "Cerebras", "Microsoft", "Meta", "FriendliAI", "IBM", "Unknown" | |
], | |
value=["OpenAI", "Anthropic", "Google", "Mistral"], | |
label="Select providers to include:" | |
) | |
provider_filter | |
return (provider_filter,) | |
@app.cell | |
def __(cost_df, provider_filter): | |
# Filtered results | |
if provider_filter.value: | |
filtered_df = cost_df[cost_df['developer'].isin(provider_filter.value)] | |
else: | |
filtered_df = cost_df | |
return (filtered_df,) | |
@app.cell | |
def __(filtered_df, mo): | |
mo.md(f"### Filtered Results ({len(filtered_df)} models)") | |
return | |
@app.cell | |
def __(filtered_df): | |
# Create a text table for easy sharing | |
_sorted_df = filtered_df.sort_values('total_cost', ascending=True) | |
text_table_lines = [ | |
"```", | |
"LLM Cost Analysis - 75% Input + Output Scenario", | |
"=" * 65, | |
f"{'Model':<35} {'Provider':<12} {'Cost':<10} {'In/Out Tokens':<15}", | |
"-" * 65 | |
] | |
for _, row in _sorted_df.iterrows(): | |
model_name = row['model_name'][:33] + ".." if len(row['model_name']) > 35 else row['model_name'] | |
provider = row['developer'][:10] | |
cost = f"${row['total_cost']:.4f}" | |
tokens = f"{row['input_tokens']//1000}K/{row['output_tokens']//1000}K" | |
text_table_lines.append(f"{model_name:<35} {provider:<12} {cost:<10} {tokens:<15}") | |
text_table_lines.append("```") | |
slack_table = "\n".join(text_table_lines) | |
return (slack_table,) | |
@app.cell | |
def __(mo, slack_table): | |
mo.md("### 📋 Slack-Ready Text Table") | |
return | |
@app.cell | |
def __(mo, slack_table): | |
mo.ui.code_editor( | |
value=slack_table, | |
language="text", | |
disabled=True | |
) | |
return | |
@app.cell | |
def __(alt, filtered_df, mo): | |
# Chart of costs by provider | |
_chart = alt.Chart(filtered_df).mark_circle(size=100).encode( | |
x=alt.X('input_tokens:Q', title='Input Tokens (75% of max)'), | |
y=alt.Y('total_cost:Q', title='Total Cost ($)', scale=alt.Scale(type='log')), | |
color=alt.Color('developer:N', title='Provider'), | |
tooltip=['model_name:N', 'developer:N', 'total_cost:Q', 'input_tokens:Q', 'output_tokens:Q'] | |
).properties( | |
width=600, | |
height=400, | |
title='Cost vs Input Tokens by Provider' | |
) | |
mo.ui.altair_chart(_chart) | |
return | |
@app.cell | |
def __(defaultdict, filtered_df, yaml): | |
# Generate YAML output | |
_yaml_data = defaultdict(lambda: defaultdict(list)) | |
for _, _row in filtered_df.iterrows(): | |
_provider = _row['developer'] | |
_cat = _row['category'] | |
_model_info = { | |
'name': _row['model_name'], | |
'max_input_tokens': int(_row['max_input_tokens']), | |
'max_output_tokens': int(_row['max_output_tokens']), | |
'cost_scenario': { | |
'input_tokens_used': int(_row['input_tokens']), | |
'output_tokens_used': int(_row['output_tokens']), | |
'total_cost_usd': float(_row['total_cost']), | |
'cost_per_1k_scenarios_usd': float(_row['total_cost'] * 1000) | |
} | |
} | |
_yaml_data[_provider][_cat].append(_model_info) | |
# Convert to regular dict for YAML serialization | |
yaml_output = dict(_yaml_data) | |
for _prov in yaml_output: | |
yaml_output[_prov] = dict(yaml_output[_prov]) | |
yaml_string = yaml.dump(yaml_output, default_flow_style=False, sort_keys=True) | |
return (yaml_output, yaml_string) | |
@app.cell | |
def __(mo): | |
mo.md("## YAML Export") | |
return | |
@app.cell | |
def __(mo, yaml_string): | |
mo.ui.code_editor( | |
value=yaml_string, | |
language="yaml", | |
disabled=True | |
) | |
return | |
if __name__ == "__main__": | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment