Created
May 2, 2025 00:34
-
-
Save angusturner/964c0037360f6268079c6224ad1135d8 to your computer and use it in GitHub Desktop.
llm roundtable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import os | |
from datetime import datetime | |
from typing import List, Dict, Any | |
import anthropic | |
from rich.console import Console | |
from rich.progress import Progress, TextColumn, BarColumn | |
from rich.panel import Panel | |
from rich.prompt import Prompt, Confirm | |
from rich.table import Table | |
import random | |
MAX_TOKENS = 8096 | |
class RateLimitHandler: | |
def __init__(self, max_concurrent=1, initial_timeout=1, max_timeout=60): | |
self.semaphore = asyncio.Semaphore(max_concurrent) | |
self.initial_timeout = initial_timeout | |
self.max_timeout = max_timeout | |
async def with_rate_limit(self, func, *args, **kwargs): | |
"""Execute a function with rate limiting and exponential backoff""" | |
timeout = self.initial_timeout | |
max_retries = 8 # Maximum number of retries | |
retries = 0 | |
async with self.semaphore: # Limit concurrent requests | |
while True: | |
try: | |
return await func(*args, **kwargs) | |
except anthropic.APIError as e: | |
# Handle rate limits (HTTP 429) | |
if e.status_code == 429 and retries < max_retries: | |
retries += 1 | |
# Get retry-after time from headers if available | |
retry_after = None | |
if ( | |
hasattr(e, "headers") | |
and e.headers | |
and "retry-after" in e.headers | |
): | |
try: | |
retry_after = int(e.headers["retry-after"]) | |
except (ValueError, TypeError): | |
pass | |
# Use provided retry time or exponential backoff with jitter | |
wait_time = retry_after or min( | |
timeout * (2**retries) * (0.5 + random.random()), | |
self.max_timeout, | |
) | |
print(f"Rate limited. Retrying in {wait_time:.1f} seconds...") | |
await asyncio.sleep(wait_time) | |
else: | |
# Re-raise if it's not a rate limit or we've exceeded retries | |
raise | |
class AIRoundtable: | |
def __init__( | |
self, | |
num_agents: int, | |
model: str = "claude-3-7-sonnet-latest", | |
output_dir: str = "output", | |
): | |
self.num_agents = num_agents | |
self.model = model | |
self.output_dir = output_dir | |
self.client = anthropic.AsyncAnthropic() | |
self.console = Console() | |
# Ensure output directory exists | |
os.makedirs(output_dir, exist_ok=True) | |
# Panel of personas | |
self.panel = [] | |
# Store proposals and their ready status | |
self.proposals = [None] * num_agents | |
self.proposal_ready = [asyncio.Event() for _ in range(num_agents)] | |
# Store critiques | |
self.critiques = [None] * num_agents | |
# Final summary | |
self.summary = None | |
# Context data (user input and files) | |
self.context = "" | |
# Track usage statistics | |
self.usage_stats = { | |
"panel_selection": None, | |
"proposals": [None] * num_agents, | |
"critiques": [None] * num_agents, | |
"summary": None, | |
"total_input_tokens": 0, | |
"total_output_tokens": 0, | |
"cache_hits": 0, # Track how many cache hits we get | |
} | |
self.rate_limiter = RateLimitHandler(max_concurrent=1) | |
def add_file_context(self, file_path: str) -> None: | |
"""Add file content to the context""" | |
try: | |
with open(file_path, "r", encoding="utf-8") as f: | |
content = f.read() | |
filename = os.path.basename(file_path) | |
self.context += f"\n\n--- File: {filename} ---\n{content}\n--- End of {filename} ---\n\n" | |
self.console.print( | |
f"[green]Added content from [bold]{filename}[/bold][/green]" | |
) | |
except Exception as e: | |
self.console.print( | |
f"[bold red]Error loading file {file_path}: {str(e)}[/bold red]" | |
) | |
def add_text_context(self, text: str) -> None: | |
"""Add text to the context""" | |
if text: | |
self.context += f"\n\n{text}\n\n" | |
self.console.print( | |
f"[green]Added text context ({len(text)} characters)[/green]" | |
) | |
def _prepare_system_messages( | |
self, specific_instruction: str | |
) -> List[Dict[str, Any]]: | |
"""Prepare system messages with caching for the shared context""" | |
system_messages = [] | |
if self.context: | |
system_messages.append( | |
{ | |
"type": "text", | |
"text": self.context, | |
"cache_control": {"type": "ephemeral"}, | |
} | |
) | |
system_messages += [{"type": "text", "text": f"{specific_instruction}"}] | |
return system_messages | |
async def run_session(self, task_prompt: str) -> Dict[str, Any]: | |
""" | |
Run a complete roundtable session | |
Args: | |
task_prompt: The specific task or question to address | |
Returns: | |
Dictionary with all results and statistics | |
""" | |
self.console.print(f"[bold green]Starting AI Roundtable. [/bold green]") | |
# Create a progress display | |
with Progress( | |
TextColumn("[bold blue]{task.description}[/bold blue]"), | |
BarColumn(), | |
TextColumn("[bold]{task.completed}/{task.total}[/bold]"), | |
) as progress: | |
# Panel selection task (this establishes the cache) | |
panel_task = progress.add_task( | |
"[yellow]Selecting panel personas (establishing cache)...", total=1 | |
) | |
await self.select_panel(task_prompt) | |
progress.update(panel_task, completed=1) | |
# Display internal panel info | |
self._display_panel_internal() | |
# Add overall progress tasks | |
proposal_overall = progress.add_task( | |
"[yellow]Proposals (overall)", total=self.num_agents | |
) | |
# Add individual agent tasks for proposals | |
proposal_tasks = {} | |
for i in range(self.num_agents): | |
task_id = progress.add_task(f"Agent #{i + 1} proposal", total=1) | |
proposal_tasks[i] = task_id | |
# Run proposal generation with progress tracking | |
self.console.print( | |
"\n[bold]Generating proposals (using cached context)...[/bold]" | |
) | |
proposal_task_list = [ | |
self._generate_proposal_with_progress( | |
i, task_prompt, progress, proposal_tasks[i], proposal_overall | |
) | |
for i in range(self.num_agents) | |
] | |
# Wait for all proposals to complete | |
await asyncio.gather(*proposal_task_list) | |
# Critique phase | |
critique_overall = progress.add_task( | |
"[yellow]Critiques (overall)", total=self.num_agents | |
) | |
# Add individual agent tasks for critiques | |
critique_tasks = {} | |
for i in range(self.num_agents): | |
target_id = (i + 1) % self.num_agents | |
task_id = progress.add_task( | |
f"Agent #{i + 1} → Proposal #{target_id + 1} critique", total=1 | |
) | |
critique_tasks[i] = task_id | |
# Run critique generation with progress tracking | |
self.console.print( | |
"\n[bold]Generating critiques (using cached context)...[/bold]" | |
) | |
critique_task_list = [ | |
self._generate_critique_with_progress( | |
i, | |
(i + 1) % self.num_agents, | |
progress, | |
critique_tasks[i], | |
critique_overall, | |
) | |
for i in range(self.num_agents) | |
] | |
# Wait for all critiques to complete | |
await asyncio.gather(*critique_task_list) | |
# Summary phase | |
self.console.print("\n[bold]Generating final synthesis...[/bold]") | |
summary_task = progress.add_task("[green]Creating synthesis", total=1) | |
await self.generate_summary() | |
progress.update(summary_task, completed=1) | |
# Calculate total token usage | |
self._calculate_total_usage() | |
# Export results to markdown | |
filename = self._export_to_markdown() | |
self.console.print( | |
f"\n[bold green]Roundtable complete! Results saved to: [/bold green][bold cyan]{filename}[/bold cyan]" | |
) | |
# Return structured results | |
return { | |
"task": task_prompt, | |
"panel": self.panel, | |
"proposals": self.proposals, | |
"critiques": self.critiques, | |
"summary": self.summary, | |
"usage_stats": self.usage_stats, | |
"output_file": filename, | |
} | |
async def select_panel(self, task_prompt: str) -> None: | |
""" | |
Select a panel of personas for the roundtable. | |
This first call establishes the cache for the shared context. | |
""" | |
panel_selection_prompt = f""" | |
Your task is to select a diverse panel of {self.num_agents} expert personas for a roundtable discussion. | |
The specific task/topic that each persona will address is as follows: | |
"{task_prompt}" | |
For each persona, provide: | |
1. A name | |
2. A title/role | |
3. A brief background (2-3 sentences) | |
4. Their area of expertise | |
5. A unique perspective they bring to the topic | |
Return your response in the following JSON format: | |
{{ | |
"panel": [ | |
{{ | |
"name": "Name of persona 1", | |
"title": "Title/role", | |
"background": "Brief background", | |
"expertise": "Area of expertise", | |
"perspective": "Unique perspective" | |
}}, | |
// Additional personas... | |
] | |
}} | |
Ensure the panel is diverse in terms of expertise and perspectives. | |
The personas should be fictional but realistic experts who would have valuable insights on the topic. | |
""" | |
# Prepare system messages with context caching | |
system_messages = self._prepare_system_messages("") | |
# Make API call | |
response = await self.rate_limiter.with_rate_limit( | |
self.client.messages.create, | |
model=self.model, | |
max_tokens=MAX_TOKENS, | |
system=system_messages, | |
messages=[{"role": "user", "content": panel_selection_prompt}], | |
) | |
# Store usage statistics | |
if hasattr(response, "usage"): | |
self.usage_stats["panel_selection"] = { | |
"input_tokens": response.usage.input_tokens, | |
"output_tokens": response.usage.output_tokens, | |
} | |
# Extract JSON from the response | |
content = response.content[0].text | |
# Handle possible text before or after JSON | |
try: | |
# First try to parse the whole content as JSON | |
self.panel = json.loads(content)["panel"] | |
except json.JSONDecodeError: | |
# If that fails, try to extract JSON from the text | |
start = content.find("{") | |
end = content.rfind("}") + 1 | |
if start != -1 and end != 0: | |
json_str = content[start:end] | |
try: | |
self.panel = json.loads(json_str)["panel"] | |
except json.JSONDecodeError: | |
# If still failing, use regex to extract just the JSON array for the panel | |
import re | |
panel_data = re.search( | |
r'"panel"\s*:\s*(\[.*?\])', content, re.DOTALL | |
) | |
if panel_data: | |
try: | |
self.panel = json.loads(panel_data.group(1)) | |
except json.JSONDecodeError: | |
raise ValueError("Could not parse panel JSON from response") | |
else: | |
raise ValueError("Could not find panel data in response") | |
else: | |
raise ValueError("Could not find JSON in response") | |
# Ensure we have the right number of personas | |
if len(self.panel) != self.num_agents: | |
self.console.print( | |
f"[bold yellow]Warning: Requested {self.num_agents} personas but received {len(self.panel)}[/bold yellow]" | |
) | |
# Truncate or pad the panel as needed | |
if len(self.panel) > self.num_agents: | |
self.panel = self.panel[: self.num_agents] | |
else: | |
# Create generic personas if we're short | |
for i in range(len(self.panel), self.num_agents): | |
self.panel.append( | |
{ | |
"name": f"Expert {i + 1}", | |
"title": "Domain Specialist", | |
"background": "Experienced professional in the field.", | |
"expertise": "General domain knowledge", | |
"perspective": "Practical application", | |
} | |
) | |
async def _generate_proposal_with_progress( | |
self, agent_id: int, task_prompt: str, progress, task_id, overall_task | |
): | |
"""Generate a proposal with progress tracking""" | |
await self.generate_proposal(agent_id, task_prompt) | |
progress.update(task_id, completed=1) | |
progress.update(overall_task, advance=1) | |
async def _generate_critique_with_progress( | |
self, reviewer_id: int, proposal_id: int, progress, task_id, overall_task | |
): | |
"""Generate a critique with progress tracking""" | |
await self.generate_critique(reviewer_id, proposal_id) | |
progress.update(task_id, completed=1) | |
progress.update(overall_task, advance=1) | |
async def generate_proposal(self, agent_id: int, task_prompt: str): | |
"""Generate a proposal from one agent""" | |
persona = self.panel[agent_id] | |
# Prepare the specific prompt for this persona | |
specific_instruction = f""" | |
You are {persona['name']}, {persona['title']}. | |
Background: {persona['background']} | |
Expertise: {persona['expertise']} | |
Perspective: {persona['perspective']} | |
Your task is to provide a thoughtful, original proposal addressing the given prompt. | |
Make sure your response reflects your expertise, background, and unique perspective. | |
Be clear, concise, and creative in your approach. | |
""" | |
# Prepare system messages with context caching | |
system_messages = self._prepare_system_messages(specific_instruction) | |
# Make API call | |
response = await self.rate_limiter.with_rate_limit( | |
self.client.messages.create, | |
model=self.model, | |
max_tokens=MAX_TOKENS, | |
system=system_messages, | |
messages=[{"role": "user", "content": task_prompt}], | |
) | |
# Store the proposal | |
self.proposals[agent_id] = { | |
"agent_id": agent_id, | |
"persona": persona, | |
"content": response.content[0].text, | |
} | |
# Store usage statistics | |
if hasattr(response, "usage"): | |
self.usage_stats["proposals"][agent_id] = { | |
"input_tokens": response.usage.input_tokens, | |
"output_tokens": response.usage.output_tokens, | |
} | |
# Check if there might be a cache hit (if input tokens are significantly reduced) | |
# This is a heuristic since the API doesn't explicitly report cache hits | |
if response.usage.input_tokens < len(self.context) / 4: # Rough estimate | |
self.usage_stats["cache_hits"] += 1 | |
# Signal that this proposal is ready for review | |
self.proposal_ready[agent_id].set() | |
async def generate_critique(self, reviewer_id: int, proposal_id: int): | |
"""Generate a critique from one agent about another's proposal""" | |
# Wait for the assigned proposal to be ready | |
await self.proposal_ready[proposal_id].wait() | |
reviewer = self.panel[reviewer_id] | |
proposal = self.proposals[proposal_id] | |
# Prepare the specific prompt for this persona | |
specific_instruction = f""" | |
You are {reviewer['name']}, {reviewer['title']}. | |
Background: {reviewer['background']} | |
Expertise: {reviewer['expertise']} | |
Perspective: {reviewer['perspective']} | |
You have been tasked with providing constructive feedback on the attached document, based on your domain expertise. | |
Consider any additional context that has been provided. Highlight both strengths and areas for improvement. | |
Be specific in your analysis and suggestions. | |
""" | |
# Prepare system messages with context caching | |
system_messages = self._prepare_system_messages(specific_instruction) | |
response = await self.rate_limiter.with_rate_limit( | |
self.client.messages.create, | |
model=self.model, | |
max_tokens=MAX_TOKENS, | |
system=system_messages, | |
messages=[ | |
{ | |
"role": "user", | |
"content": f"Please critique the following document:\n\n{proposal['content']}", | |
} | |
], | |
) | |
# Store the critique | |
self.critiques[reviewer_id] = { | |
"reviewer_id": reviewer_id, | |
"proposal_id": proposal_id, | |
"content": response.content[0].text, | |
} | |
# Store usage statistics | |
if hasattr(response, "usage"): | |
self.usage_stats["critiques"][reviewer_id] = { | |
"input_tokens": response.usage.input_tokens, | |
"output_tokens": response.usage.output_tokens, | |
} | |
# Check if there might be a cache hit | |
if response.usage.input_tokens < len(self.context) / 4: # Rough estimate | |
self.usage_stats["cache_hits"] += 1 | |
async def generate_summary(self): | |
"""Generate a final synthesis of all proposals and critiques""" | |
specific_instruction = """ | |
You are a neutral facilitator tasked with synthesizing multiple perspectives. | |
Your task is to summarize multiple documents and their critiques from a roundtable discussion. | |
Create a comprehensive synthesis that: | |
1. Identifies key ideas from each proposal | |
2. Highlights common themes across proposals | |
3. Incorporates critical insights from the reviews | |
4. Addresses areas of agreement and disagreement | |
5. Presents well-reasoned conclusions | |
Your synthesis should be thorough, balanced, and practical. | |
Focus on the content of the proposals and critiques rather than who wrote them. | |
""" | |
# Prepare system messages with context caching | |
system_messages = self._prepare_system_messages(specific_instruction) | |
# Add proposals and critiques without persona details | |
content = "## Proposals and Critiques:\n\n" | |
for i in range(self.num_agents): | |
content += f"### Proposal {i + 1}:\n{self.proposals[i]['content']}\n\n" | |
# Find the critique for this proposal | |
for critique in self.critiques: | |
if critique["proposal_id"] == i: | |
content += ( | |
f"### Critique of Proposal {i + 1}:\n{critique['content']}\n\n" | |
) | |
# Make API call | |
response = await self.rate_limiter.with_rate_limit( | |
self.client.messages.create, | |
model=self.model, | |
max_tokens=MAX_TOKENS, | |
system=system_messages, | |
messages=[{"role": "user", "content": content}], | |
) | |
self.summary = response.content[0].text | |
# Store usage statistics | |
if hasattr(response, "usage"): | |
self.usage_stats["summary"] = { | |
"input_tokens": response.usage.input_tokens, | |
"output_tokens": response.usage.output_tokens, | |
} | |
# Check if there might be a cache hit | |
if response.usage.input_tokens < len(self.context) / 4: # Rough estimate | |
self.usage_stats["cache_hits"] += 1 | |
def _display_panel_internal(self): | |
"""Display the selected panel members (internal use only)""" | |
self.console.print( | |
"\n[bold cyan]=== INTERNAL PANEL CONFIGURATION ===\n[/bold cyan]" | |
) | |
self.console.print("[dim](Note: This is for internal tracking only)[/dim]\n") | |
for i, persona in enumerate(self.panel): | |
panel_content = f"[bold]{persona['name']}[/bold]\n" | |
panel_content += f"[italic]{persona['title']}[/italic]\n\n" | |
panel_content += f"{persona['background']}\n\n" | |
panel_content += f"[bold]Expertise:[/bold] {persona['expertise']}\n" | |
panel_content += f"[bold]Perspective:[/bold] {persona['perspective']}" | |
self.console.print( | |
Panel( | |
panel_content, | |
title=f"Agent #{i + 1}", | |
border_style=f"color({30 + i})", | |
) | |
) | |
self.console.print("") # Add some spacing | |
def _calculate_total_usage(self): | |
"""Calculate the total token usage across all operations""" | |
total_input = 0 | |
total_output = 0 | |
# Add panel selection usage | |
if self.usage_stats["panel_selection"]: | |
total_input += self.usage_stats["panel_selection"].get("input_tokens", 0) | |
total_output += self.usage_stats["panel_selection"].get("output_tokens", 0) | |
# Add up proposal usage | |
for usage in self.usage_stats["proposals"]: | |
if usage: | |
total_input += usage.get("input_tokens", 0) | |
total_output += usage.get("output_tokens", 0) | |
# Add up critique usage | |
for usage in self.usage_stats["critiques"]: | |
if usage: | |
total_input += usage.get("input_tokens", 0) | |
total_output += usage.get("output_tokens", 0) | |
# Add summary usage | |
if self.usage_stats["summary"]: | |
total_input += self.usage_stats["summary"].get("input_tokens", 0) | |
total_output += self.usage_stats["summary"].get("output_tokens", 0) | |
self.usage_stats["total_input_tokens"] = total_input | |
self.usage_stats["total_output_tokens"] = total_output | |
def _export_to_markdown(self) -> str: | |
"""Export the results to a markdown file and return the filename""" | |
# Create a sanitized filename from the topic | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{self.output_dir}/roundtable_{timestamp}.md" | |
with open(filename, "w", encoding="utf-8") as f: | |
# Title and header | |
f.write(f"# AI Roundtable\n\n") | |
f.write( | |
f"*Generated on {datetime.now().strftime('%Y-%m-%d at %H:%M:%S')}*\n\n" | |
) | |
# Final synthesis | |
f.write("## Executive Summary\n\n") | |
f.write(self.summary) | |
f.write("\n\n") | |
# Usage statistics | |
f.write("## Usage Statistics\n\n") | |
f.write(f"- Model: {self.model}\n") | |
f.write(f"- Number of agents: {self.num_agents}\n") | |
f.write(f"- Total input tokens: {self.usage_stats['total_input_tokens']}\n") | |
f.write( | |
f"- Total output tokens: {self.usage_stats['total_output_tokens']}\n" | |
) | |
f.write(f"- Estimated cache hits: {self.usage_stats['cache_hits']}\n\n") | |
# Appendix with proposals and critiques | |
f.write("## Appendix: Original Proposals and Critiques\n\n") | |
for i in range(self.num_agents): | |
f.write(f"### Proposal {i + 1}\n\n") | |
f.write(self.proposals[i]["content"]) | |
f.write("\n\n") | |
# Find and write the critique for this proposal | |
for critique in self.critiques: | |
if critique["proposal_id"] == i: | |
f.write(f"#### Critique of Proposal {i + 1}\n\n") | |
f.write(critique["content"]) | |
f.write("\n\n") | |
return filename | |
def display_results(self) -> None: | |
"""Display a summary of the results to the console""" | |
console = Console() | |
# Display usage statistics | |
console.print("\n[bold cyan]=== AI ROUNDTABLE SUMMARY ===\n[/bold cyan]") | |
# Display overall results | |
console.print(f"[bold]Agents:[/bold] {self.num_agents}") | |
console.print(f"[bold]Model:[/bold] {self.model}") | |
# Display a preview of the synthesis | |
console.print("\n[bold yellow]Executive Summary (Preview):[/bold yellow]") | |
preview_length = min(500, len(self.summary)) | |
console.print( | |
self.summary[:preview_length] | |
+ ("..." if len(self.summary) > preview_length else "") | |
) | |
# Create a table for token usage | |
usage_table = Table(title=f"Token Usage Summary") | |
usage_table.add_column("Operation", style="cyan") | |
usage_table.add_column("Input Tokens", justify="right", style="green") | |
usage_table.add_column("Output Tokens", justify="right", style="green") | |
# Add panel selection row | |
if self.usage_stats["panel_selection"]: | |
usage_table.add_row( | |
"Panel Selection", | |
str(self.usage_stats["panel_selection"]["input_tokens"]), | |
str(self.usage_stats["panel_selection"]["output_tokens"]), | |
) | |
# Add proposal/critique summaries | |
total_proposal_in = sum( | |
usage.get("input_tokens", 0) | |
for usage in self.usage_stats["proposals"] | |
if usage | |
) | |
total_proposal_out = sum( | |
usage.get("output_tokens", 0) | |
for usage in self.usage_stats["proposals"] | |
if usage | |
) | |
total_critique_in = sum( | |
usage.get("input_tokens", 0) | |
for usage in self.usage_stats["critiques"] | |
if usage | |
) | |
total_critique_out = sum( | |
usage.get("output_tokens", 0) | |
for usage in self.usage_stats["critiques"] | |
if usage | |
) | |
usage_table.add_row( | |
f"Proposals ({self.num_agents})", | |
str(total_proposal_in), | |
str(total_proposal_out), | |
) | |
usage_table.add_row( | |
f"Critiques ({self.num_agents})", | |
str(total_critique_in), | |
str(total_critique_out), | |
) | |
# Add summary usage row | |
if self.usage_stats["summary"]: | |
usage_table.add_row( | |
"Final Synthesis", | |
str(self.usage_stats["summary"]["input_tokens"]), | |
str(self.usage_stats["summary"]["output_tokens"]), | |
) | |
# Add total usage row | |
usage_table.add_row( | |
"[bold]TOTAL[/bold]", | |
f"[bold]{self.usage_stats['total_input_tokens']}[/bold]", | |
f"[bold]{self.usage_stats['total_output_tokens']}[/bold]", | |
) | |
console.print("\n") | |
console.print(usage_table) | |
# Display cache effectiveness | |
console.print( | |
f"\n[bold]Estimated cache hits:[/bold] {self.usage_stats['cache_hits']}" | |
) | |
# Calculate estimated cost (approximate based on current Claude pricing) | |
# These rates should be updated to match actual pricing | |
input_rate_per_1k = 0.015 # Example rate per 1K tokens for input | |
output_rate_per_1k = 0.075 # Example rate per 1K tokens for output | |
total_input_cost = ( | |
self.usage_stats["total_input_tokens"] / 1000 | |
) * input_rate_per_1k | |
total_output_cost = ( | |
self.usage_stats["total_output_tokens"] / 1000 | |
) * output_rate_per_1k | |
total_cost = total_input_cost + total_output_cost | |
# Calculate theoretical cost without caching (very rough estimate) | |
potential_input_without_cache = self.usage_stats["total_input_tokens"] | |
if self.context and self.usage_stats["cache_hits"] > 0: | |
# Add the cached context size back for each hit | |
context_tokens = len(self.context) / 4 # Very rough estimate of token count | |
potential_input_without_cache += ( | |
context_tokens * self.usage_stats["cache_hits"] | |
) | |
potential_cost_without_cache = ( | |
potential_input_without_cache / 1000 | |
) * input_rate_per_1k + total_output_cost | |
savings = potential_cost_without_cache - total_cost | |
console.print(f"\n[bold yellow]Estimated Cost:[/bold yellow]") | |
console.print(f"Input tokens: ${total_input_cost:.4f}") | |
console.print(f"Output tokens: ${total_output_cost:.4f}") | |
console.print(f"[bold]Total cost: ${total_cost:.4f}[/bold]") | |
if savings > 0: | |
console.print( | |
f"[bold green]Estimated savings from caching: ${savings:.4f}[/bold green]" | |
) | |
async def cli(): | |
"""Command-line interface for the AI Roundtable""" | |
console = Console() | |
console.print("[bold cyan]==============================[/bold cyan]") | |
console.print("[bold cyan] AI ROUNDTABLE [/bold cyan]") | |
console.print("[bold cyan]==============================[/bold cyan]") | |
console.print( | |
"\nGenerate diverse perspectives and synthesize ideas using multiple AI agents.\n" | |
) | |
# Get number of agents | |
num_agents = int(Prompt.ask("Number of agents for the roundtable", default="3")) | |
# Get specific question/prompt | |
task_prompt = Prompt.ask( | |
"Describe the topic or task you would like the AI agents to address." | |
) | |
# Create the roundtable | |
roundtable = AIRoundtable(num_agents) | |
# Ask about additional context | |
if Confirm.ask("Do you want to add additional context?"): | |
context_text = Prompt.ask("Enter additional context (leave empty to skip)") | |
if context_text: | |
roundtable.add_text_context(context_text) | |
# Ask about file context | |
if Confirm.ask("Do you want to add files as context?"): | |
while True: | |
file_path = Prompt.ask("Enter file path (leave empty to finish)") | |
if not file_path: | |
break | |
if os.path.exists(file_path): | |
roundtable.add_file_context(file_path) | |
else: | |
console.print(f"[bold red]File not found: {file_path}[/bold red]") | |
# Confirm start | |
estimated_cost = 0.10 * num_agents # Very rough estimate | |
console.print( | |
f"\n[bold yellow]Estimated cost:[/bold yellow] ${estimated_cost:.2f} (rough approximation)" | |
) | |
if not Confirm.ask("Start the roundtable?"): | |
console.print("[bold red]Process cancelled.[/bold red]") | |
return | |
# Run the roundtable | |
results = await roundtable.run_session(task_prompt) | |
# Display results | |
roundtable.display_results() | |
# Open file? | |
if Confirm.ask(f"Open the output file ({results['output_file']})?"): | |
try: | |
import subprocess | |
import platform | |
if platform.system() == "Darwin": # macOS | |
subprocess.call(("open", results["output_file"])) | |
elif platform.system() == "Windows": | |
os.startfile(results["output_file"]) | |
else: # Linux | |
subprocess.call(("xdg-open", results["output_file"])) | |
except Exception as e: | |
console.print(f"[bold red]Error opening file: {str(e)}[/bold red]") | |
console.print(f"File path: {results['output_file']}") | |
if __name__ == "__main__": | |
asyncio.run(cli()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment