Skip to content

Instantly share code, notes, and snippets.

@angusturner
Created May 2, 2025 00:34
Show Gist options
  • Save angusturner/964c0037360f6268079c6224ad1135d8 to your computer and use it in GitHub Desktop.
Save angusturner/964c0037360f6268079c6224ad1135d8 to your computer and use it in GitHub Desktop.
llm roundtable
import asyncio
import json
import os
from datetime import datetime
from typing import List, Dict, Any
import anthropic
from rich.console import Console
from rich.progress import Progress, TextColumn, BarColumn
from rich.panel import Panel
from rich.prompt import Prompt, Confirm
from rich.table import Table
import random
MAX_TOKENS = 8096
class RateLimitHandler:
def __init__(self, max_concurrent=1, initial_timeout=1, max_timeout=60):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.initial_timeout = initial_timeout
self.max_timeout = max_timeout
async def with_rate_limit(self, func, *args, **kwargs):
"""Execute a function with rate limiting and exponential backoff"""
timeout = self.initial_timeout
max_retries = 8 # Maximum number of retries
retries = 0
async with self.semaphore: # Limit concurrent requests
while True:
try:
return await func(*args, **kwargs)
except anthropic.APIError as e:
# Handle rate limits (HTTP 429)
if e.status_code == 429 and retries < max_retries:
retries += 1
# Get retry-after time from headers if available
retry_after = None
if (
hasattr(e, "headers")
and e.headers
and "retry-after" in e.headers
):
try:
retry_after = int(e.headers["retry-after"])
except (ValueError, TypeError):
pass
# Use provided retry time or exponential backoff with jitter
wait_time = retry_after or min(
timeout * (2**retries) * (0.5 + random.random()),
self.max_timeout,
)
print(f"Rate limited. Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
# Re-raise if it's not a rate limit or we've exceeded retries
raise
class AIRoundtable:
def __init__(
self,
num_agents: int,
model: str = "claude-3-7-sonnet-latest",
output_dir: str = "output",
):
self.num_agents = num_agents
self.model = model
self.output_dir = output_dir
self.client = anthropic.AsyncAnthropic()
self.console = Console()
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Panel of personas
self.panel = []
# Store proposals and their ready status
self.proposals = [None] * num_agents
self.proposal_ready = [asyncio.Event() for _ in range(num_agents)]
# Store critiques
self.critiques = [None] * num_agents
# Final summary
self.summary = None
# Context data (user input and files)
self.context = ""
# Track usage statistics
self.usage_stats = {
"panel_selection": None,
"proposals": [None] * num_agents,
"critiques": [None] * num_agents,
"summary": None,
"total_input_tokens": 0,
"total_output_tokens": 0,
"cache_hits": 0, # Track how many cache hits we get
}
self.rate_limiter = RateLimitHandler(max_concurrent=1)
def add_file_context(self, file_path: str) -> None:
"""Add file content to the context"""
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
filename = os.path.basename(file_path)
self.context += f"\n\n--- File: {filename} ---\n{content}\n--- End of {filename} ---\n\n"
self.console.print(
f"[green]Added content from [bold]{filename}[/bold][/green]"
)
except Exception as e:
self.console.print(
f"[bold red]Error loading file {file_path}: {str(e)}[/bold red]"
)
def add_text_context(self, text: str) -> None:
"""Add text to the context"""
if text:
self.context += f"\n\n{text}\n\n"
self.console.print(
f"[green]Added text context ({len(text)} characters)[/green]"
)
def _prepare_system_messages(
self, specific_instruction: str
) -> List[Dict[str, Any]]:
"""Prepare system messages with caching for the shared context"""
system_messages = []
if self.context:
system_messages.append(
{
"type": "text",
"text": self.context,
"cache_control": {"type": "ephemeral"},
}
)
system_messages += [{"type": "text", "text": f"{specific_instruction}"}]
return system_messages
async def run_session(self, task_prompt: str) -> Dict[str, Any]:
"""
Run a complete roundtable session
Args:
task_prompt: The specific task or question to address
Returns:
Dictionary with all results and statistics
"""
self.console.print(f"[bold green]Starting AI Roundtable. [/bold green]")
# Create a progress display
with Progress(
TextColumn("[bold blue]{task.description}[/bold blue]"),
BarColumn(),
TextColumn("[bold]{task.completed}/{task.total}[/bold]"),
) as progress:
# Panel selection task (this establishes the cache)
panel_task = progress.add_task(
"[yellow]Selecting panel personas (establishing cache)...", total=1
)
await self.select_panel(task_prompt)
progress.update(panel_task, completed=1)
# Display internal panel info
self._display_panel_internal()
# Add overall progress tasks
proposal_overall = progress.add_task(
"[yellow]Proposals (overall)", total=self.num_agents
)
# Add individual agent tasks for proposals
proposal_tasks = {}
for i in range(self.num_agents):
task_id = progress.add_task(f"Agent #{i + 1} proposal", total=1)
proposal_tasks[i] = task_id
# Run proposal generation with progress tracking
self.console.print(
"\n[bold]Generating proposals (using cached context)...[/bold]"
)
proposal_task_list = [
self._generate_proposal_with_progress(
i, task_prompt, progress, proposal_tasks[i], proposal_overall
)
for i in range(self.num_agents)
]
# Wait for all proposals to complete
await asyncio.gather(*proposal_task_list)
# Critique phase
critique_overall = progress.add_task(
"[yellow]Critiques (overall)", total=self.num_agents
)
# Add individual agent tasks for critiques
critique_tasks = {}
for i in range(self.num_agents):
target_id = (i + 1) % self.num_agents
task_id = progress.add_task(
f"Agent #{i + 1} → Proposal #{target_id + 1} critique", total=1
)
critique_tasks[i] = task_id
# Run critique generation with progress tracking
self.console.print(
"\n[bold]Generating critiques (using cached context)...[/bold]"
)
critique_task_list = [
self._generate_critique_with_progress(
i,
(i + 1) % self.num_agents,
progress,
critique_tasks[i],
critique_overall,
)
for i in range(self.num_agents)
]
# Wait for all critiques to complete
await asyncio.gather(*critique_task_list)
# Summary phase
self.console.print("\n[bold]Generating final synthesis...[/bold]")
summary_task = progress.add_task("[green]Creating synthesis", total=1)
await self.generate_summary()
progress.update(summary_task, completed=1)
# Calculate total token usage
self._calculate_total_usage()
# Export results to markdown
filename = self._export_to_markdown()
self.console.print(
f"\n[bold green]Roundtable complete! Results saved to: [/bold green][bold cyan]{filename}[/bold cyan]"
)
# Return structured results
return {
"task": task_prompt,
"panel": self.panel,
"proposals": self.proposals,
"critiques": self.critiques,
"summary": self.summary,
"usage_stats": self.usage_stats,
"output_file": filename,
}
async def select_panel(self, task_prompt: str) -> None:
"""
Select a panel of personas for the roundtable.
This first call establishes the cache for the shared context.
"""
panel_selection_prompt = f"""
Your task is to select a diverse panel of {self.num_agents} expert personas for a roundtable discussion.
The specific task/topic that each persona will address is as follows:
"{task_prompt}"
For each persona, provide:
1. A name
2. A title/role
3. A brief background (2-3 sentences)
4. Their area of expertise
5. A unique perspective they bring to the topic
Return your response in the following JSON format:
{{
"panel": [
{{
"name": "Name of persona 1",
"title": "Title/role",
"background": "Brief background",
"expertise": "Area of expertise",
"perspective": "Unique perspective"
}},
// Additional personas...
]
}}
Ensure the panel is diverse in terms of expertise and perspectives.
The personas should be fictional but realistic experts who would have valuable insights on the topic.
"""
# Prepare system messages with context caching
system_messages = self._prepare_system_messages("")
# Make API call
response = await self.rate_limiter.with_rate_limit(
self.client.messages.create,
model=self.model,
max_tokens=MAX_TOKENS,
system=system_messages,
messages=[{"role": "user", "content": panel_selection_prompt}],
)
# Store usage statistics
if hasattr(response, "usage"):
self.usage_stats["panel_selection"] = {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
# Extract JSON from the response
content = response.content[0].text
# Handle possible text before or after JSON
try:
# First try to parse the whole content as JSON
self.panel = json.loads(content)["panel"]
except json.JSONDecodeError:
# If that fails, try to extract JSON from the text
start = content.find("{")
end = content.rfind("}") + 1
if start != -1 and end != 0:
json_str = content[start:end]
try:
self.panel = json.loads(json_str)["panel"]
except json.JSONDecodeError:
# If still failing, use regex to extract just the JSON array for the panel
import re
panel_data = re.search(
r'"panel"\s*:\s*(\[.*?\])', content, re.DOTALL
)
if panel_data:
try:
self.panel = json.loads(panel_data.group(1))
except json.JSONDecodeError:
raise ValueError("Could not parse panel JSON from response")
else:
raise ValueError("Could not find panel data in response")
else:
raise ValueError("Could not find JSON in response")
# Ensure we have the right number of personas
if len(self.panel) != self.num_agents:
self.console.print(
f"[bold yellow]Warning: Requested {self.num_agents} personas but received {len(self.panel)}[/bold yellow]"
)
# Truncate or pad the panel as needed
if len(self.panel) > self.num_agents:
self.panel = self.panel[: self.num_agents]
else:
# Create generic personas if we're short
for i in range(len(self.panel), self.num_agents):
self.panel.append(
{
"name": f"Expert {i + 1}",
"title": "Domain Specialist",
"background": "Experienced professional in the field.",
"expertise": "General domain knowledge",
"perspective": "Practical application",
}
)
async def _generate_proposal_with_progress(
self, agent_id: int, task_prompt: str, progress, task_id, overall_task
):
"""Generate a proposal with progress tracking"""
await self.generate_proposal(agent_id, task_prompt)
progress.update(task_id, completed=1)
progress.update(overall_task, advance=1)
async def _generate_critique_with_progress(
self, reviewer_id: int, proposal_id: int, progress, task_id, overall_task
):
"""Generate a critique with progress tracking"""
await self.generate_critique(reviewer_id, proposal_id)
progress.update(task_id, completed=1)
progress.update(overall_task, advance=1)
async def generate_proposal(self, agent_id: int, task_prompt: str):
"""Generate a proposal from one agent"""
persona = self.panel[agent_id]
# Prepare the specific prompt for this persona
specific_instruction = f"""
You are {persona['name']}, {persona['title']}.
Background: {persona['background']}
Expertise: {persona['expertise']}
Perspective: {persona['perspective']}
Your task is to provide a thoughtful, original proposal addressing the given prompt.
Make sure your response reflects your expertise, background, and unique perspective.
Be clear, concise, and creative in your approach.
"""
# Prepare system messages with context caching
system_messages = self._prepare_system_messages(specific_instruction)
# Make API call
response = await self.rate_limiter.with_rate_limit(
self.client.messages.create,
model=self.model,
max_tokens=MAX_TOKENS,
system=system_messages,
messages=[{"role": "user", "content": task_prompt}],
)
# Store the proposal
self.proposals[agent_id] = {
"agent_id": agent_id,
"persona": persona,
"content": response.content[0].text,
}
# Store usage statistics
if hasattr(response, "usage"):
self.usage_stats["proposals"][agent_id] = {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
# Check if there might be a cache hit (if input tokens are significantly reduced)
# This is a heuristic since the API doesn't explicitly report cache hits
if response.usage.input_tokens < len(self.context) / 4: # Rough estimate
self.usage_stats["cache_hits"] += 1
# Signal that this proposal is ready for review
self.proposal_ready[agent_id].set()
async def generate_critique(self, reviewer_id: int, proposal_id: int):
"""Generate a critique from one agent about another's proposal"""
# Wait for the assigned proposal to be ready
await self.proposal_ready[proposal_id].wait()
reviewer = self.panel[reviewer_id]
proposal = self.proposals[proposal_id]
# Prepare the specific prompt for this persona
specific_instruction = f"""
You are {reviewer['name']}, {reviewer['title']}.
Background: {reviewer['background']}
Expertise: {reviewer['expertise']}
Perspective: {reviewer['perspective']}
You have been tasked with providing constructive feedback on the attached document, based on your domain expertise.
Consider any additional context that has been provided. Highlight both strengths and areas for improvement.
Be specific in your analysis and suggestions.
"""
# Prepare system messages with context caching
system_messages = self._prepare_system_messages(specific_instruction)
response = await self.rate_limiter.with_rate_limit(
self.client.messages.create,
model=self.model,
max_tokens=MAX_TOKENS,
system=system_messages,
messages=[
{
"role": "user",
"content": f"Please critique the following document:\n\n{proposal['content']}",
}
],
)
# Store the critique
self.critiques[reviewer_id] = {
"reviewer_id": reviewer_id,
"proposal_id": proposal_id,
"content": response.content[0].text,
}
# Store usage statistics
if hasattr(response, "usage"):
self.usage_stats["critiques"][reviewer_id] = {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
# Check if there might be a cache hit
if response.usage.input_tokens < len(self.context) / 4: # Rough estimate
self.usage_stats["cache_hits"] += 1
async def generate_summary(self):
"""Generate a final synthesis of all proposals and critiques"""
specific_instruction = """
You are a neutral facilitator tasked with synthesizing multiple perspectives.
Your task is to summarize multiple documents and their critiques from a roundtable discussion.
Create a comprehensive synthesis that:
1. Identifies key ideas from each proposal
2. Highlights common themes across proposals
3. Incorporates critical insights from the reviews
4. Addresses areas of agreement and disagreement
5. Presents well-reasoned conclusions
Your synthesis should be thorough, balanced, and practical.
Focus on the content of the proposals and critiques rather than who wrote them.
"""
# Prepare system messages with context caching
system_messages = self._prepare_system_messages(specific_instruction)
# Add proposals and critiques without persona details
content = "## Proposals and Critiques:\n\n"
for i in range(self.num_agents):
content += f"### Proposal {i + 1}:\n{self.proposals[i]['content']}\n\n"
# Find the critique for this proposal
for critique in self.critiques:
if critique["proposal_id"] == i:
content += (
f"### Critique of Proposal {i + 1}:\n{critique['content']}\n\n"
)
# Make API call
response = await self.rate_limiter.with_rate_limit(
self.client.messages.create,
model=self.model,
max_tokens=MAX_TOKENS,
system=system_messages,
messages=[{"role": "user", "content": content}],
)
self.summary = response.content[0].text
# Store usage statistics
if hasattr(response, "usage"):
self.usage_stats["summary"] = {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
# Check if there might be a cache hit
if response.usage.input_tokens < len(self.context) / 4: # Rough estimate
self.usage_stats["cache_hits"] += 1
def _display_panel_internal(self):
"""Display the selected panel members (internal use only)"""
self.console.print(
"\n[bold cyan]=== INTERNAL PANEL CONFIGURATION ===\n[/bold cyan]"
)
self.console.print("[dim](Note: This is for internal tracking only)[/dim]\n")
for i, persona in enumerate(self.panel):
panel_content = f"[bold]{persona['name']}[/bold]\n"
panel_content += f"[italic]{persona['title']}[/italic]\n\n"
panel_content += f"{persona['background']}\n\n"
panel_content += f"[bold]Expertise:[/bold] {persona['expertise']}\n"
panel_content += f"[bold]Perspective:[/bold] {persona['perspective']}"
self.console.print(
Panel(
panel_content,
title=f"Agent #{i + 1}",
border_style=f"color({30 + i})",
)
)
self.console.print("") # Add some spacing
def _calculate_total_usage(self):
"""Calculate the total token usage across all operations"""
total_input = 0
total_output = 0
# Add panel selection usage
if self.usage_stats["panel_selection"]:
total_input += self.usage_stats["panel_selection"].get("input_tokens", 0)
total_output += self.usage_stats["panel_selection"].get("output_tokens", 0)
# Add up proposal usage
for usage in self.usage_stats["proposals"]:
if usage:
total_input += usage.get("input_tokens", 0)
total_output += usage.get("output_tokens", 0)
# Add up critique usage
for usage in self.usage_stats["critiques"]:
if usage:
total_input += usage.get("input_tokens", 0)
total_output += usage.get("output_tokens", 0)
# Add summary usage
if self.usage_stats["summary"]:
total_input += self.usage_stats["summary"].get("input_tokens", 0)
total_output += self.usage_stats["summary"].get("output_tokens", 0)
self.usage_stats["total_input_tokens"] = total_input
self.usage_stats["total_output_tokens"] = total_output
def _export_to_markdown(self) -> str:
"""Export the results to a markdown file and return the filename"""
# Create a sanitized filename from the topic
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.output_dir}/roundtable_{timestamp}.md"
with open(filename, "w", encoding="utf-8") as f:
# Title and header
f.write(f"# AI Roundtable\n\n")
f.write(
f"*Generated on {datetime.now().strftime('%Y-%m-%d at %H:%M:%S')}*\n\n"
)
# Final synthesis
f.write("## Executive Summary\n\n")
f.write(self.summary)
f.write("\n\n")
# Usage statistics
f.write("## Usage Statistics\n\n")
f.write(f"- Model: {self.model}\n")
f.write(f"- Number of agents: {self.num_agents}\n")
f.write(f"- Total input tokens: {self.usage_stats['total_input_tokens']}\n")
f.write(
f"- Total output tokens: {self.usage_stats['total_output_tokens']}\n"
)
f.write(f"- Estimated cache hits: {self.usage_stats['cache_hits']}\n\n")
# Appendix with proposals and critiques
f.write("## Appendix: Original Proposals and Critiques\n\n")
for i in range(self.num_agents):
f.write(f"### Proposal {i + 1}\n\n")
f.write(self.proposals[i]["content"])
f.write("\n\n")
# Find and write the critique for this proposal
for critique in self.critiques:
if critique["proposal_id"] == i:
f.write(f"#### Critique of Proposal {i + 1}\n\n")
f.write(critique["content"])
f.write("\n\n")
return filename
def display_results(self) -> None:
"""Display a summary of the results to the console"""
console = Console()
# Display usage statistics
console.print("\n[bold cyan]=== AI ROUNDTABLE SUMMARY ===\n[/bold cyan]")
# Display overall results
console.print(f"[bold]Agents:[/bold] {self.num_agents}")
console.print(f"[bold]Model:[/bold] {self.model}")
# Display a preview of the synthesis
console.print("\n[bold yellow]Executive Summary (Preview):[/bold yellow]")
preview_length = min(500, len(self.summary))
console.print(
self.summary[:preview_length]
+ ("..." if len(self.summary) > preview_length else "")
)
# Create a table for token usage
usage_table = Table(title=f"Token Usage Summary")
usage_table.add_column("Operation", style="cyan")
usage_table.add_column("Input Tokens", justify="right", style="green")
usage_table.add_column("Output Tokens", justify="right", style="green")
# Add panel selection row
if self.usage_stats["panel_selection"]:
usage_table.add_row(
"Panel Selection",
str(self.usage_stats["panel_selection"]["input_tokens"]),
str(self.usage_stats["panel_selection"]["output_tokens"]),
)
# Add proposal/critique summaries
total_proposal_in = sum(
usage.get("input_tokens", 0)
for usage in self.usage_stats["proposals"]
if usage
)
total_proposal_out = sum(
usage.get("output_tokens", 0)
for usage in self.usage_stats["proposals"]
if usage
)
total_critique_in = sum(
usage.get("input_tokens", 0)
for usage in self.usage_stats["critiques"]
if usage
)
total_critique_out = sum(
usage.get("output_tokens", 0)
for usage in self.usage_stats["critiques"]
if usage
)
usage_table.add_row(
f"Proposals ({self.num_agents})",
str(total_proposal_in),
str(total_proposal_out),
)
usage_table.add_row(
f"Critiques ({self.num_agents})",
str(total_critique_in),
str(total_critique_out),
)
# Add summary usage row
if self.usage_stats["summary"]:
usage_table.add_row(
"Final Synthesis",
str(self.usage_stats["summary"]["input_tokens"]),
str(self.usage_stats["summary"]["output_tokens"]),
)
# Add total usage row
usage_table.add_row(
"[bold]TOTAL[/bold]",
f"[bold]{self.usage_stats['total_input_tokens']}[/bold]",
f"[bold]{self.usage_stats['total_output_tokens']}[/bold]",
)
console.print("\n")
console.print(usage_table)
# Display cache effectiveness
console.print(
f"\n[bold]Estimated cache hits:[/bold] {self.usage_stats['cache_hits']}"
)
# Calculate estimated cost (approximate based on current Claude pricing)
# These rates should be updated to match actual pricing
input_rate_per_1k = 0.015 # Example rate per 1K tokens for input
output_rate_per_1k = 0.075 # Example rate per 1K tokens for output
total_input_cost = (
self.usage_stats["total_input_tokens"] / 1000
) * input_rate_per_1k
total_output_cost = (
self.usage_stats["total_output_tokens"] / 1000
) * output_rate_per_1k
total_cost = total_input_cost + total_output_cost
# Calculate theoretical cost without caching (very rough estimate)
potential_input_without_cache = self.usage_stats["total_input_tokens"]
if self.context and self.usage_stats["cache_hits"] > 0:
# Add the cached context size back for each hit
context_tokens = len(self.context) / 4 # Very rough estimate of token count
potential_input_without_cache += (
context_tokens * self.usage_stats["cache_hits"]
)
potential_cost_without_cache = (
potential_input_without_cache / 1000
) * input_rate_per_1k + total_output_cost
savings = potential_cost_without_cache - total_cost
console.print(f"\n[bold yellow]Estimated Cost:[/bold yellow]")
console.print(f"Input tokens: ${total_input_cost:.4f}")
console.print(f"Output tokens: ${total_output_cost:.4f}")
console.print(f"[bold]Total cost: ${total_cost:.4f}[/bold]")
if savings > 0:
console.print(
f"[bold green]Estimated savings from caching: ${savings:.4f}[/bold green]"
)
async def cli():
"""Command-line interface for the AI Roundtable"""
console = Console()
console.print("[bold cyan]==============================[/bold cyan]")
console.print("[bold cyan] AI ROUNDTABLE [/bold cyan]")
console.print("[bold cyan]==============================[/bold cyan]")
console.print(
"\nGenerate diverse perspectives and synthesize ideas using multiple AI agents.\n"
)
# Get number of agents
num_agents = int(Prompt.ask("Number of agents for the roundtable", default="3"))
# Get specific question/prompt
task_prompt = Prompt.ask(
"Describe the topic or task you would like the AI agents to address."
)
# Create the roundtable
roundtable = AIRoundtable(num_agents)
# Ask about additional context
if Confirm.ask("Do you want to add additional context?"):
context_text = Prompt.ask("Enter additional context (leave empty to skip)")
if context_text:
roundtable.add_text_context(context_text)
# Ask about file context
if Confirm.ask("Do you want to add files as context?"):
while True:
file_path = Prompt.ask("Enter file path (leave empty to finish)")
if not file_path:
break
if os.path.exists(file_path):
roundtable.add_file_context(file_path)
else:
console.print(f"[bold red]File not found: {file_path}[/bold red]")
# Confirm start
estimated_cost = 0.10 * num_agents # Very rough estimate
console.print(
f"\n[bold yellow]Estimated cost:[/bold yellow] ${estimated_cost:.2f} (rough approximation)"
)
if not Confirm.ask("Start the roundtable?"):
console.print("[bold red]Process cancelled.[/bold red]")
return
# Run the roundtable
results = await roundtable.run_session(task_prompt)
# Display results
roundtable.display_results()
# Open file?
if Confirm.ask(f"Open the output file ({results['output_file']})?"):
try:
import subprocess
import platform
if platform.system() == "Darwin": # macOS
subprocess.call(("open", results["output_file"]))
elif platform.system() == "Windows":
os.startfile(results["output_file"])
else: # Linux
subprocess.call(("xdg-open", results["output_file"]))
except Exception as e:
console.print(f"[bold red]Error opening file: {str(e)}[/bold red]")
console.print(f"File path: {results['output_file']}")
if __name__ == "__main__":
asyncio.run(cli())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment