Skip to content

Instantly share code, notes, and snippets.

@jwt625
Created June 27, 2025 06:04
Show Gist options
  • Save jwt625/def9839a26e5c3eb2f7405ce54ae601e to your computer and use it in GitHub Desktop.
Save jwt625/def9839a26e5c3eb2f7405ce54ae601e to your computer and use it in GitHub Desktop.
"""
Minimal orchestrator implementation demonstrating two-phase execution pattern.
Uses the same LLM for both planning and execution phases.
"""
import json
import re
import datetime
import os
import time
from typing import Dict, Any, List
from openai import AsyncOpenAI
# Load environment variables (dotenv not needed for this test)
class MinimalOrchestrator:
"""Minimal orchestrator using two-phase pattern with same LLM."""
def __init__(self):
api_key = os.getenv("LLM_API_KEY") or os.getenv("INFERENCE_API_KEY")
base_url = os.getenv("INFERENCE_API_URL", "https://api.lambdalabs.com/v1")
if not api_key:
raise ValueError("LLM_API_KEY or INFERENCE_API_KEY environment variable is required")
self.client = AsyncOpenAI(
api_key=api_key,
base_url=base_url,
)
# self.model = "llama3.1-70b-instruct-fp8" # Known to work with structured output
self.model = "llama-4-maverick-17b-128e-instruct-fp8" # Try a different model that's better at structured output
# Available tools
self.tools = {
"get_current_time": {
"type": "function",
"function": {
"name": "get_current_time",
"description": "Get the current date and time",
"parameters": {
"type": "object",
"properties": {
"format": {
"type": "string",
"enum": ["iso", "human", "timestamp"],
"default": "human"
}
},
"required": []
}
}
},
"calculator": {
"type": "function",
"function": {
"name": "calculator",
"description": "Perform basic mathematical calculations",
"parameters": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["add", "subtract", "multiply", "divide"]
},
"a": {"type": "number"},
"b": {"type": "number"}
},
"required": ["operation", "a", "b"]
}
}
}
}
# Planning schema with tool enum and required fields
self.plan_schema = {
"type": "object",
"properties": {
"understanding": {"type": "string"},
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"step_id": {"type": "string"},
"tool": {"type": "string", "enum": ["get_current_time", "calculator"]},
"action": {"type": "string"},
"depends_on": {"type": "array", "items": {"type": "string"}}
},
"required": ["step_id", "tool", "action", "depends_on"]
}
}
},
"required": ["understanding", "steps"]
}
async def execute_query(self, user_query: str) -> Dict[str, Any]:
"""Execute query using two-phase pattern."""
print(f"🚀 Query: {user_query}")
# Phase 1: Planning (structured output)
print("\n📋 Phase 1: Planning")
plan = await self._generate_plan(user_query)
print(f"Plan: {plan['understanding']}")
print(f"Steps: {len(plan['steps'])}")
# Phase 2: Execution (specific tool calls)
print("\n⚙️ Phase 2: Execution")
context = {}
for i, step in enumerate(plan['steps'], 1):
print(f"\nStep {i}: {step['tool']}")
result = await self._execute_step(step, context, user_query, plan['understanding'])
if result:
context[step['step_id']] = result
print(f"✅ Result: {result}")
else:
print("❌ Failed")
return context
async def _generate_plan(self, user_query: str) -> Dict[str, Any]:
"""Phase 1: Generate execution plan using structured output."""
# Build simple tool descriptions (avoid complexity that might confuse LLM)
tool_descriptions = []
for tool_name, tool_schema in self.tools.items():
func = tool_schema["function"]
# Keep it simple: just name and description
desc = f"- {func['name']}: {func['description']}"
tool_descriptions.append(desc)
tools_text = "\n".join(tool_descriptions)
planning_system_prompt = f"""You are a task planner. Break down the user's request into steps using available tools.
Available tools:
{tools_text}
Return JSON with this structure:
{{
"understanding": "brief description of what the user wants",
"steps": [
{{
"step_id": "step1",
"tool": "tool_name_from_available_tools",
"action": "what this step accomplishes",
"depends_on": []
}}
]
}}
Use only the exact tool names listed above. Create as many steps as needed."""
# print(f"🔍 Planning system prompt length: {len(planning_system_prompt)} chars")
# print(f"🔍 User query: {user_query}")
resp_fmt = {
"type": "json_schema",
"json_schema": {
"name": "ExecutionPlan",
"schema": self.plan_schema
}
}
# resp_fmt = {"type": "json_schema", "json_schema": self.plan_schema}
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": planning_system_prompt},
{"role": "user", "content": user_query}
],
response_format=resp_fmt,
temperature=0.1,
max_tokens=500, # Even smaller limit
timeout=10.0 # Add explicit timeout
)
print(f"✅ Planning response received")
except Exception as e:
print(f"❌ Planning failed: {e}")
raise
# Your original debug output
print(f"Raw response: {response}")
raw_content = response.choices[0].message.content
print(f"Raw content: {repr(raw_content)}")
return json.loads(raw_content)
async def _execute_step(self, step: Dict[str, Any], context: Dict[str, Any], original_query: str, plan_understanding: str) -> Dict[str, Any]:
"""Phase 2: Execute step using specific tool_choice."""
# Build context message with full query context
context_msg = f"""Original query: {original_query}
Goal: {plan_understanding}
Current step: Use {step['tool']} tool
Step ID: {step['step_id']}"""
# Add previous results if any
if context:
context_msg += "\n\nPrevious results:"
for step_id, result in context.items():
context_msg += f"\n{step_id}: {result}"
# Call LLM with specific tool choice
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": f"Use the {step['tool']} tool to complete the requested action. Consider the original query and any previous results."},
{"role": "user", "content": context_msg}
],
tools=[self.tools[step['tool']]],
tool_choice={
"type": "function",
"function": {"name": step['tool']}
},
temperature=0.1
)
# Process tool call
if response.choices[0].message.tool_calls:
tool_call = response.choices[0].message.tool_calls[0]
params = json.loads(tool_call.function.arguments)
# Simulate tool execution
return self._simulate_tool(step['tool'], params)
return None
def _simulate_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Simulate tool execution for demo."""
if tool_name == "get_current_time":
# Get actual current time
now = datetime.datetime.now(datetime.timezone.utc)
format_type = params.get("format", "human")
if format_type == "iso":
formatted_time = now.isoformat() + "Z"
elif format_type == "timestamp":
formatted_time = str(int(now.timestamp()))
else: # human
formatted_time = now.strftime("%Y-%m-%d %H:%M:%S UTC")
return {
"current_time": formatted_time,
"format": format_type,
"minute": now.minute,
"second": now.second,
"hour": now.hour
}
elif tool_name == "calculator":
a = params["a"]
b = params["b"]
op = params["operation"]
if op == "multiply":
result = a * b
elif op == "add":
result = a + b
elif op == "subtract":
result = a - b
elif op == "divide":
result = a / b if b != 0 else "Error: Division by zero"
return {
"operation": op,
"a": a,
"b": b,
"result": result,
"expression": f"{a} {op} {b} = {result}"
}
return {"error": f"Unknown tool: {tool_name}"}
class OrchestratorModelTester:
"""Test the minimal orchestrator across all available models."""
def __init__(self, n_runs: int = 3):
self.n_runs = n_runs
self.test_query = "Get the current time and multiply the minute number by the second number"
self.results = {}
# Setup logging
self.timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self.log_file = f"orchestrator_test_{self.timestamp}.log"
self.results_file = f"orchestrator_results_{self.timestamp}.json"
# Create log file
with open(self.log_file, 'w') as f:
f.write(f"Orchestrator Testing Started: {datetime.datetime.now()}\n")
f.write(f"Test Query: {self.test_query}\n")
f.write(f"Runs per model: {self.n_runs}\n")
f.write("="*80 + "\n\n")
print(f"📝 Logging to: {self.log_file}")
print(f"💾 Results will be saved to: {self.results_file}")
async def get_available_models(self) -> List[str]:
"""Get list of available models from the API."""
try:
api_key = os.getenv("LLM_API_KEY") or os.getenv("INFERENCE_API_KEY")
base_url = os.getenv("INFERENCE_API_URL", "https://api.lambdalabs.com/v1")
client = AsyncOpenAI(api_key=api_key, base_url=base_url)
models = await client.models.list()
model_list = [model.id for model in models.data]
print(f"📋 Found {len(model_list)} available models")
return model_list
except Exception as e:
print(f"❌ Failed to fetch models: {e}")
return []
def _log(self, message: str) -> None:
"""Log message to both console and file."""
print(message)
with open(self.log_file, 'a') as f:
f.write(f"{datetime.datetime.now().strftime('%H:%M:%S')} - {message}\n")
async def test_single_model_run(self, model_id: str, run_number: int) -> Dict[str, Any]:
"""Test orchestrator with a single model for one run."""
self._log(f"\n🔄 Testing {model_id} - Run {run_number}/{self.n_runs}")
try:
# Create orchestrator with specific model
orchestrator = MinimalOrchestrator()
orchestrator.model = model_id
start_time = time.time()
result = await orchestrator.execute_query(self.test_query)
end_time = time.time()
# Validate result structure and get detailed analysis
validation = self._validate_result(result)
return {
"success": validation["success"],
"execution_time": end_time - start_time,
"result": result,
"validation": validation,
"error": None
}
except Exception as e:
self._log(f"❌ Run {run_number} failed: {e}")
return {
"success": False,
"execution_time": 0,
"result": None,
"validation": {
"success": False,
"steps_completed": 0,
"tools_called": [],
"validation_errors": [f"execution_error: {str(e)}"]
},
"error": str(e)
}
def _validate_result(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that the orchestrator result is correct and return detailed analysis."""
analysis = {
"success": False,
"steps_completed": 0,
"tools_called": [],
"step1_success": False,
"step2_success": False,
"calculation_correct": False,
"expected_steps": 2,
"validation_errors": []
}
try:
# Check step1 (get_current_time)
if "step1" in result:
analysis["steps_completed"] += 1
analysis["tools_called"].append("get_current_time")
step1 = result["step1"]
if all(key in step1 for key in ["minute", "second", "current_time"]):
analysis["step1_success"] = True
else:
analysis["validation_errors"].append("step1 missing required fields")
else:
analysis["validation_errors"].append("step1 not found")
# Check step2 (calculator)
if "step2" in result:
analysis["steps_completed"] += 1
analysis["tools_called"].append("calculator")
step2 = result["step2"]
if all(key in step2 for key in ["operation", "a", "b", "result"]):
analysis["step2_success"] = True
# Verify the calculation is correct
if (step2["operation"] == "multiply" and
"step1" in result and
analysis["step1_success"]):
step1 = result["step1"]
expected_result = step1["minute"] * step1["second"]
if step2["result"] == expected_result:
analysis["calculation_correct"] = True
else:
analysis["validation_errors"].append(
f"calculation incorrect: {step2['result']} != {expected_result}"
)
else:
analysis["validation_errors"].append("operation not multiply or step1 failed")
else:
analysis["validation_errors"].append("step2 missing required fields")
else:
analysis["validation_errors"].append("step2 not found")
# Overall success
analysis["success"] = (analysis["step1_success"] and
analysis["step2_success"] and
analysis["calculation_correct"])
return analysis
except Exception as e:
analysis["validation_errors"].append(f"validation exception: {str(e)}")
return analysis
async def test_all_models(self) -> Dict[str, Dict[str, Any]]:
"""Test orchestrator across all available models."""
self._log(f"🚀 Starting orchestrator testing across all models ({self.n_runs} runs each)")
models = await self.get_available_models()
if not models:
self._log("❌ No models available for testing")
return {}
all_results = {}
for model_id in models:
self._log(f"\n{'='*60}")
self._log(f"🧪 Testing Model: {model_id}")
self._log(f"{'='*60}")
model_results = {
"runs": [],
"success_count": 0,
"total_runs": self.n_runs,
"avg_execution_time": 0,
"consistency_score": 0,
"errors": [],
"tool_calling_stats": {
"step1_success_count": 0,
"step2_success_count": 0,
"calculation_correct_count": 0,
"avg_steps_completed": 0,
"get_current_time_calls": 0,
"calculator_calls": 0,
"common_errors": {}
}
}
# Run multiple tests for this model
for run in range(1, self.n_runs + 1):
run_result = await self.test_single_model_run(model_id, run)
model_results["runs"].append(run_result)
if run_result["success"]:
model_results["success_count"] += 1
else:
model_results["errors"].append(run_result["error"])
# Collect tool calling statistics
validation = run_result.get("validation", {})
stats = model_results["tool_calling_stats"]
if validation.get("step1_success"):
stats["step1_success_count"] += 1
if validation.get("step2_success"):
stats["step2_success_count"] += 1
if validation.get("calculation_correct"):
stats["calculation_correct_count"] += 1
stats["avg_steps_completed"] += validation.get("steps_completed", 0)
# Count tool calls
tools_called = validation.get("tools_called", [])
stats["get_current_time_calls"] += tools_called.count("get_current_time")
stats["calculator_calls"] += tools_called.count("calculator")
# Track common errors
for error in validation.get("validation_errors", []):
if error in stats["common_errors"]:
stats["common_errors"][error] += 1
else:
stats["common_errors"][error] = 1
# Calculate statistics
successful_runs = [r for r in model_results["runs"] if r["success"]]
if successful_runs:
model_results["avg_execution_time"] = sum(r["execution_time"] for r in successful_runs) / len(successful_runs)
model_results["consistency_score"] = model_results["success_count"] / self.n_runs
model_results["tool_calling_stats"]["avg_steps_completed"] /= self.n_runs
# Print model summary
self._print_model_summary(model_id, model_results)
all_results[model_id] = model_results
# Save intermediate results after each model
self._save_intermediate_results(all_results)
# Print overall summary
self._print_overall_summary(all_results)
# Save results to file
self._save_results(all_results)
return all_results
def _save_results(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Save test results to JSON file."""
try:
# Add metadata
results_with_metadata = {
"metadata": {
"timestamp": self.timestamp,
"test_query": self.test_query,
"runs_per_model": self.n_runs,
"total_models": len(all_results),
"test_duration": "completed"
},
"results": all_results
}
with open(self.results_file, 'w') as f:
json.dump(results_with_metadata, f, indent=2, default=str)
self._log(f"💾 Results saved to: {self.results_file}")
except Exception as e:
self._log(f"❌ Failed to save results: {e}")
def _save_intermediate_results(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Save intermediate results after each model (in case test gets interrupted)."""
try:
intermediate_file = f"orchestrator_intermediate_{self.timestamp}.json"
results_with_metadata = {
"metadata": {
"timestamp": self.timestamp,
"test_query": self.test_query,
"runs_per_model": self.n_runs,
"models_completed": len(all_results),
"status": "in_progress"
},
"results": all_results
}
with open(intermediate_file, 'w') as f:
json.dump(results_with_metadata, f, indent=2, default=str)
except Exception:
pass # Don't fail the test if intermediate save fails
def _print_model_summary(self, model_id: str, results: Dict[str, Any]) -> None:
"""Print summary for a single model."""
success_rate = results["consistency_score"] * 100
avg_time = results["avg_execution_time"]
stats = results["tool_calling_stats"]
status = "✅" if success_rate == 100 else "⚠️" if success_rate > 0 else "❌"
print(f"\n{status} {model_id}:")
print(f" Success Rate: {success_rate:.1f}% ({results['success_count']}/{results['total_runs']})")
print(f" Avg Time: {avg_time:.2f}s")
print(f" Avg Steps Completed: {stats['avg_steps_completed']:.1f}/2")
# Tool calling breakdown
print(f" Tool Calling Breakdown:")
print(f" get_current_time: {stats['get_current_time_calls']}/{results['total_runs']} calls")
print(f" calculator: {stats['calculator_calls']}/{results['total_runs']} calls")
print(f" step1 success: {stats['step1_success_count']}/{results['total_runs']}")
print(f" step2 success: {stats['step2_success_count']}/{results['total_runs']}")
print(f" calculation correct: {stats['calculation_correct_count']}/{results['total_runs']}")
# Show most common errors
if stats["common_errors"]:
print(f" Most Common Issues:")
sorted_errors = sorted(stats["common_errors"].items(), key=lambda x: x[1], reverse=True)
for error, count in sorted_errors[:3]: # Show top 3 errors
print(f" - {error}: {count} times")
def _print_overall_summary(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Print overall summary across all models."""
print(f"\n{'='*80}")
print("🎯 ORCHESTRATOR TESTING SUMMARY")
print(f"{'='*80}")
# Categorize models by performance
perfect_models = []
partial_models = []
failed_models = []
for model_id, results in all_results.items():
consistency = results["consistency_score"]
if consistency == 1.0:
perfect_models.append(model_id)
elif consistency > 0:
partial_models.append(model_id)
else:
failed_models.append(model_id)
print(f"\n✅ Perfect Orchestration (100% success): {len(perfect_models)} models")
for model in perfect_models:
avg_time = all_results[model]["avg_execution_time"]
print(f" - {model} (avg: {avg_time:.2f}s)")
print(f"\n⚠️ Partial Orchestration (>0% success): {len(partial_models)} models")
for model in partial_models:
success_rate = all_results[model]["consistency_score"] * 100
avg_time = all_results[model]["avg_execution_time"]
print(f" - {model} ({success_rate:.1f}% success, avg: {avg_time:.2f}s)")
print(f"\n❌ Failed Orchestration (0% success): {len(failed_models)} models")
for model in failed_models:
print(f" - {model}")
# Overall statistics
total_models = len(all_results)
orchestration_capable = len(perfect_models) + len(partial_models)
# Aggregate tool calling statistics
total_runs = sum(r["total_runs"] for r in all_results.values())
total_get_time_calls = sum(r["tool_calling_stats"]["get_current_time_calls"] for r in all_results.values())
total_calc_calls = sum(r["tool_calling_stats"]["calculator_calls"] for r in all_results.values())
total_step1_success = sum(r["tool_calling_stats"]["step1_success_count"] for r in all_results.values())
total_step2_success = sum(r["tool_calling_stats"]["step2_success_count"] for r in all_results.values())
print(f"\n📊 Overall Statistics:")
print(f" Total Models Tested: {total_models}")
print(f" Total Test Runs: {total_runs}")
print(f" Orchestration Capable: {orchestration_capable}/{total_models} ({orchestration_capable/total_models*100:.1f}%)")
print(f" Perfect Orchestration: {len(perfect_models)}/{total_models} ({len(perfect_models)/total_models*100:.1f}%)")
print(f"\n🔧 Tool Calling Statistics Across All Models:")
print(f" get_current_time calls: {total_get_time_calls}/{total_runs} ({total_get_time_calls/total_runs*100:.1f}%)")
print(f" calculator calls: {total_calc_calls}/{total_runs} ({total_calc_calls/total_runs*100:.1f}%)")
print(f" Step 1 success rate: {total_step1_success}/{total_runs} ({total_step1_success/total_runs*100:.1f}%)")
print(f" Step 2 success rate: {total_step2_success}/{total_runs} ({total_step2_success/total_runs*100:.1f}%)")
# Show models by tool calling capability
print(f"\n🎯 Models by Tool Calling Capability:")
for model_id, results in sorted(all_results.items(), key=lambda x: x[1]["consistency_score"], reverse=True):
stats = results["tool_calling_stats"]
get_time_rate = stats["get_current_time_calls"] / results["total_runs"] * 100
calc_rate = stats["calculator_calls"] / results["total_runs"] * 100
print(f" {model_id}: get_time={get_time_rate:.0f}%, calc={calc_rate:.0f}%, success={results['consistency_score']*100:.0f}%")
# Test functions
async def test_minimal_orchestrator():
"""Test the minimal orchestrator with time query."""
orchestrator = MinimalOrchestrator()
query = "Get the current time and multiply the minute number by the second number"
result = await orchestrator.execute_query(query)
print(f"\n🎯 Final Result: {result}")
async def test_all_models():
"""Test orchestrator across all available models."""
tester = OrchestratorModelTester(n_runs=3)
results = await tester.test_all_models()
return results
if __name__ == "__main__":
import asyncio
import sys
if len(sys.argv) > 1 and sys.argv[1] == "test-all":
print("🧪 Running comprehensive model testing...")
asyncio.run(test_all_models())
else:
print("🧪 Running single orchestrator test...")
asyncio.run(test_minimal_orchestrator())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment