Created
June 27, 2025 06:04
-
-
Save jwt625/def9839a26e5c3eb2f7405ce54ae601e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Minimal orchestrator implementation demonstrating two-phase execution pattern. | |
Uses the same LLM for both planning and execution phases. | |
""" | |
import json | |
import re | |
import datetime | |
import os | |
import time | |
from typing import Dict, Any, List | |
from openai import AsyncOpenAI | |
# Load environment variables (dotenv not needed for this test) | |
class MinimalOrchestrator: | |
"""Minimal orchestrator using two-phase pattern with same LLM.""" | |
def __init__(self): | |
api_key = os.getenv("LLM_API_KEY") or os.getenv("INFERENCE_API_KEY") | |
base_url = os.getenv("INFERENCE_API_URL", "https://api.lambdalabs.com/v1") | |
if not api_key: | |
raise ValueError("LLM_API_KEY or INFERENCE_API_KEY environment variable is required") | |
self.client = AsyncOpenAI( | |
api_key=api_key, | |
base_url=base_url, | |
) | |
# self.model = "llama3.1-70b-instruct-fp8" # Known to work with structured output | |
self.model = "llama-4-maverick-17b-128e-instruct-fp8" # Try a different model that's better at structured output | |
# Available tools | |
self.tools = { | |
"get_current_time": { | |
"type": "function", | |
"function": { | |
"name": "get_current_time", | |
"description": "Get the current date and time", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"format": { | |
"type": "string", | |
"enum": ["iso", "human", "timestamp"], | |
"default": "human" | |
} | |
}, | |
"required": [] | |
} | |
} | |
}, | |
"calculator": { | |
"type": "function", | |
"function": { | |
"name": "calculator", | |
"description": "Perform basic mathematical calculations", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"operation": { | |
"type": "string", | |
"enum": ["add", "subtract", "multiply", "divide"] | |
}, | |
"a": {"type": "number"}, | |
"b": {"type": "number"} | |
}, | |
"required": ["operation", "a", "b"] | |
} | |
} | |
} | |
} | |
# Planning schema with tool enum and required fields | |
self.plan_schema = { | |
"type": "object", | |
"properties": { | |
"understanding": {"type": "string"}, | |
"steps": { | |
"type": "array", | |
"items": { | |
"type": "object", | |
"properties": { | |
"step_id": {"type": "string"}, | |
"tool": {"type": "string", "enum": ["get_current_time", "calculator"]}, | |
"action": {"type": "string"}, | |
"depends_on": {"type": "array", "items": {"type": "string"}} | |
}, | |
"required": ["step_id", "tool", "action", "depends_on"] | |
} | |
} | |
}, | |
"required": ["understanding", "steps"] | |
} | |
async def execute_query(self, user_query: str) -> Dict[str, Any]: | |
"""Execute query using two-phase pattern.""" | |
print(f"🚀 Query: {user_query}") | |
# Phase 1: Planning (structured output) | |
print("\n📋 Phase 1: Planning") | |
plan = await self._generate_plan(user_query) | |
print(f"Plan: {plan['understanding']}") | |
print(f"Steps: {len(plan['steps'])}") | |
# Phase 2: Execution (specific tool calls) | |
print("\n⚙️ Phase 2: Execution") | |
context = {} | |
for i, step in enumerate(plan['steps'], 1): | |
print(f"\nStep {i}: {step['tool']}") | |
result = await self._execute_step(step, context, user_query, plan['understanding']) | |
if result: | |
context[step['step_id']] = result | |
print(f"✅ Result: {result}") | |
else: | |
print("❌ Failed") | |
return context | |
async def _generate_plan(self, user_query: str) -> Dict[str, Any]: | |
"""Phase 1: Generate execution plan using structured output.""" | |
# Build simple tool descriptions (avoid complexity that might confuse LLM) | |
tool_descriptions = [] | |
for tool_name, tool_schema in self.tools.items(): | |
func = tool_schema["function"] | |
# Keep it simple: just name and description | |
desc = f"- {func['name']}: {func['description']}" | |
tool_descriptions.append(desc) | |
tools_text = "\n".join(tool_descriptions) | |
planning_system_prompt = f"""You are a task planner. Break down the user's request into steps using available tools. | |
Available tools: | |
{tools_text} | |
Return JSON with this structure: | |
{{ | |
"understanding": "brief description of what the user wants", | |
"steps": [ | |
{{ | |
"step_id": "step1", | |
"tool": "tool_name_from_available_tools", | |
"action": "what this step accomplishes", | |
"depends_on": [] | |
}} | |
] | |
}} | |
Use only the exact tool names listed above. Create as many steps as needed.""" | |
# print(f"🔍 Planning system prompt length: {len(planning_system_prompt)} chars") | |
# print(f"🔍 User query: {user_query}") | |
resp_fmt = { | |
"type": "json_schema", | |
"json_schema": { | |
"name": "ExecutionPlan", | |
"schema": self.plan_schema | |
} | |
} | |
# resp_fmt = {"type": "json_schema", "json_schema": self.plan_schema} | |
try: | |
response = await self.client.chat.completions.create( | |
model=self.model, | |
messages=[ | |
{"role": "system", "content": planning_system_prompt}, | |
{"role": "user", "content": user_query} | |
], | |
response_format=resp_fmt, | |
temperature=0.1, | |
max_tokens=500, # Even smaller limit | |
timeout=10.0 # Add explicit timeout | |
) | |
print(f"✅ Planning response received") | |
except Exception as e: | |
print(f"❌ Planning failed: {e}") | |
raise | |
# Your original debug output | |
print(f"Raw response: {response}") | |
raw_content = response.choices[0].message.content | |
print(f"Raw content: {repr(raw_content)}") | |
return json.loads(raw_content) | |
async def _execute_step(self, step: Dict[str, Any], context: Dict[str, Any], original_query: str, plan_understanding: str) -> Dict[str, Any]: | |
"""Phase 2: Execute step using specific tool_choice.""" | |
# Build context message with full query context | |
context_msg = f"""Original query: {original_query} | |
Goal: {plan_understanding} | |
Current step: Use {step['tool']} tool | |
Step ID: {step['step_id']}""" | |
# Add previous results if any | |
if context: | |
context_msg += "\n\nPrevious results:" | |
for step_id, result in context.items(): | |
context_msg += f"\n{step_id}: {result}" | |
# Call LLM with specific tool choice | |
response = await self.client.chat.completions.create( | |
model=self.model, | |
messages=[ | |
{"role": "system", "content": f"Use the {step['tool']} tool to complete the requested action. Consider the original query and any previous results."}, | |
{"role": "user", "content": context_msg} | |
], | |
tools=[self.tools[step['tool']]], | |
tool_choice={ | |
"type": "function", | |
"function": {"name": step['tool']} | |
}, | |
temperature=0.1 | |
) | |
# Process tool call | |
if response.choices[0].message.tool_calls: | |
tool_call = response.choices[0].message.tool_calls[0] | |
params = json.loads(tool_call.function.arguments) | |
# Simulate tool execution | |
return self._simulate_tool(step['tool'], params) | |
return None | |
def _simulate_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]: | |
"""Simulate tool execution for demo.""" | |
if tool_name == "get_current_time": | |
# Get actual current time | |
now = datetime.datetime.now(datetime.timezone.utc) | |
format_type = params.get("format", "human") | |
if format_type == "iso": | |
formatted_time = now.isoformat() + "Z" | |
elif format_type == "timestamp": | |
formatted_time = str(int(now.timestamp())) | |
else: # human | |
formatted_time = now.strftime("%Y-%m-%d %H:%M:%S UTC") | |
return { | |
"current_time": formatted_time, | |
"format": format_type, | |
"minute": now.minute, | |
"second": now.second, | |
"hour": now.hour | |
} | |
elif tool_name == "calculator": | |
a = params["a"] | |
b = params["b"] | |
op = params["operation"] | |
if op == "multiply": | |
result = a * b | |
elif op == "add": | |
result = a + b | |
elif op == "subtract": | |
result = a - b | |
elif op == "divide": | |
result = a / b if b != 0 else "Error: Division by zero" | |
return { | |
"operation": op, | |
"a": a, | |
"b": b, | |
"result": result, | |
"expression": f"{a} {op} {b} = {result}" | |
} | |
return {"error": f"Unknown tool: {tool_name}"} | |
class OrchestratorModelTester: | |
"""Test the minimal orchestrator across all available models.""" | |
def __init__(self, n_runs: int = 3): | |
self.n_runs = n_runs | |
self.test_query = "Get the current time and multiply the minute number by the second number" | |
self.results = {} | |
# Setup logging | |
self.timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
self.log_file = f"orchestrator_test_{self.timestamp}.log" | |
self.results_file = f"orchestrator_results_{self.timestamp}.json" | |
# Create log file | |
with open(self.log_file, 'w') as f: | |
f.write(f"Orchestrator Testing Started: {datetime.datetime.now()}\n") | |
f.write(f"Test Query: {self.test_query}\n") | |
f.write(f"Runs per model: {self.n_runs}\n") | |
f.write("="*80 + "\n\n") | |
print(f"📝 Logging to: {self.log_file}") | |
print(f"💾 Results will be saved to: {self.results_file}") | |
async def get_available_models(self) -> List[str]: | |
"""Get list of available models from the API.""" | |
try: | |
api_key = os.getenv("LLM_API_KEY") or os.getenv("INFERENCE_API_KEY") | |
base_url = os.getenv("INFERENCE_API_URL", "https://api.lambdalabs.com/v1") | |
client = AsyncOpenAI(api_key=api_key, base_url=base_url) | |
models = await client.models.list() | |
model_list = [model.id for model in models.data] | |
print(f"📋 Found {len(model_list)} available models") | |
return model_list | |
except Exception as e: | |
print(f"❌ Failed to fetch models: {e}") | |
return [] | |
def _log(self, message: str) -> None: | |
"""Log message to both console and file.""" | |
print(message) | |
with open(self.log_file, 'a') as f: | |
f.write(f"{datetime.datetime.now().strftime('%H:%M:%S')} - {message}\n") | |
async def test_single_model_run(self, model_id: str, run_number: int) -> Dict[str, Any]: | |
"""Test orchestrator with a single model for one run.""" | |
self._log(f"\n🔄 Testing {model_id} - Run {run_number}/{self.n_runs}") | |
try: | |
# Create orchestrator with specific model | |
orchestrator = MinimalOrchestrator() | |
orchestrator.model = model_id | |
start_time = time.time() | |
result = await orchestrator.execute_query(self.test_query) | |
end_time = time.time() | |
# Validate result structure and get detailed analysis | |
validation = self._validate_result(result) | |
return { | |
"success": validation["success"], | |
"execution_time": end_time - start_time, | |
"result": result, | |
"validation": validation, | |
"error": None | |
} | |
except Exception as e: | |
self._log(f"❌ Run {run_number} failed: {e}") | |
return { | |
"success": False, | |
"execution_time": 0, | |
"result": None, | |
"validation": { | |
"success": False, | |
"steps_completed": 0, | |
"tools_called": [], | |
"validation_errors": [f"execution_error: {str(e)}"] | |
}, | |
"error": str(e) | |
} | |
def _validate_result(self, result: Dict[str, Any]) -> Dict[str, Any]: | |
"""Validate that the orchestrator result is correct and return detailed analysis.""" | |
analysis = { | |
"success": False, | |
"steps_completed": 0, | |
"tools_called": [], | |
"step1_success": False, | |
"step2_success": False, | |
"calculation_correct": False, | |
"expected_steps": 2, | |
"validation_errors": [] | |
} | |
try: | |
# Check step1 (get_current_time) | |
if "step1" in result: | |
analysis["steps_completed"] += 1 | |
analysis["tools_called"].append("get_current_time") | |
step1 = result["step1"] | |
if all(key in step1 for key in ["minute", "second", "current_time"]): | |
analysis["step1_success"] = True | |
else: | |
analysis["validation_errors"].append("step1 missing required fields") | |
else: | |
analysis["validation_errors"].append("step1 not found") | |
# Check step2 (calculator) | |
if "step2" in result: | |
analysis["steps_completed"] += 1 | |
analysis["tools_called"].append("calculator") | |
step2 = result["step2"] | |
if all(key in step2 for key in ["operation", "a", "b", "result"]): | |
analysis["step2_success"] = True | |
# Verify the calculation is correct | |
if (step2["operation"] == "multiply" and | |
"step1" in result and | |
analysis["step1_success"]): | |
step1 = result["step1"] | |
expected_result = step1["minute"] * step1["second"] | |
if step2["result"] == expected_result: | |
analysis["calculation_correct"] = True | |
else: | |
analysis["validation_errors"].append( | |
f"calculation incorrect: {step2['result']} != {expected_result}" | |
) | |
else: | |
analysis["validation_errors"].append("operation not multiply or step1 failed") | |
else: | |
analysis["validation_errors"].append("step2 missing required fields") | |
else: | |
analysis["validation_errors"].append("step2 not found") | |
# Overall success | |
analysis["success"] = (analysis["step1_success"] and | |
analysis["step2_success"] and | |
analysis["calculation_correct"]) | |
return analysis | |
except Exception as e: | |
analysis["validation_errors"].append(f"validation exception: {str(e)}") | |
return analysis | |
async def test_all_models(self) -> Dict[str, Dict[str, Any]]: | |
"""Test orchestrator across all available models.""" | |
self._log(f"🚀 Starting orchestrator testing across all models ({self.n_runs} runs each)") | |
models = await self.get_available_models() | |
if not models: | |
self._log("❌ No models available for testing") | |
return {} | |
all_results = {} | |
for model_id in models: | |
self._log(f"\n{'='*60}") | |
self._log(f"🧪 Testing Model: {model_id}") | |
self._log(f"{'='*60}") | |
model_results = { | |
"runs": [], | |
"success_count": 0, | |
"total_runs": self.n_runs, | |
"avg_execution_time": 0, | |
"consistency_score": 0, | |
"errors": [], | |
"tool_calling_stats": { | |
"step1_success_count": 0, | |
"step2_success_count": 0, | |
"calculation_correct_count": 0, | |
"avg_steps_completed": 0, | |
"get_current_time_calls": 0, | |
"calculator_calls": 0, | |
"common_errors": {} | |
} | |
} | |
# Run multiple tests for this model | |
for run in range(1, self.n_runs + 1): | |
run_result = await self.test_single_model_run(model_id, run) | |
model_results["runs"].append(run_result) | |
if run_result["success"]: | |
model_results["success_count"] += 1 | |
else: | |
model_results["errors"].append(run_result["error"]) | |
# Collect tool calling statistics | |
validation = run_result.get("validation", {}) | |
stats = model_results["tool_calling_stats"] | |
if validation.get("step1_success"): | |
stats["step1_success_count"] += 1 | |
if validation.get("step2_success"): | |
stats["step2_success_count"] += 1 | |
if validation.get("calculation_correct"): | |
stats["calculation_correct_count"] += 1 | |
stats["avg_steps_completed"] += validation.get("steps_completed", 0) | |
# Count tool calls | |
tools_called = validation.get("tools_called", []) | |
stats["get_current_time_calls"] += tools_called.count("get_current_time") | |
stats["calculator_calls"] += tools_called.count("calculator") | |
# Track common errors | |
for error in validation.get("validation_errors", []): | |
if error in stats["common_errors"]: | |
stats["common_errors"][error] += 1 | |
else: | |
stats["common_errors"][error] = 1 | |
# Calculate statistics | |
successful_runs = [r for r in model_results["runs"] if r["success"]] | |
if successful_runs: | |
model_results["avg_execution_time"] = sum(r["execution_time"] for r in successful_runs) / len(successful_runs) | |
model_results["consistency_score"] = model_results["success_count"] / self.n_runs | |
model_results["tool_calling_stats"]["avg_steps_completed"] /= self.n_runs | |
# Print model summary | |
self._print_model_summary(model_id, model_results) | |
all_results[model_id] = model_results | |
# Save intermediate results after each model | |
self._save_intermediate_results(all_results) | |
# Print overall summary | |
self._print_overall_summary(all_results) | |
# Save results to file | |
self._save_results(all_results) | |
return all_results | |
def _save_results(self, all_results: Dict[str, Dict[str, Any]]) -> None: | |
"""Save test results to JSON file.""" | |
try: | |
# Add metadata | |
results_with_metadata = { | |
"metadata": { | |
"timestamp": self.timestamp, | |
"test_query": self.test_query, | |
"runs_per_model": self.n_runs, | |
"total_models": len(all_results), | |
"test_duration": "completed" | |
}, | |
"results": all_results | |
} | |
with open(self.results_file, 'w') as f: | |
json.dump(results_with_metadata, f, indent=2, default=str) | |
self._log(f"💾 Results saved to: {self.results_file}") | |
except Exception as e: | |
self._log(f"❌ Failed to save results: {e}") | |
def _save_intermediate_results(self, all_results: Dict[str, Dict[str, Any]]) -> None: | |
"""Save intermediate results after each model (in case test gets interrupted).""" | |
try: | |
intermediate_file = f"orchestrator_intermediate_{self.timestamp}.json" | |
results_with_metadata = { | |
"metadata": { | |
"timestamp": self.timestamp, | |
"test_query": self.test_query, | |
"runs_per_model": self.n_runs, | |
"models_completed": len(all_results), | |
"status": "in_progress" | |
}, | |
"results": all_results | |
} | |
with open(intermediate_file, 'w') as f: | |
json.dump(results_with_metadata, f, indent=2, default=str) | |
except Exception: | |
pass # Don't fail the test if intermediate save fails | |
def _print_model_summary(self, model_id: str, results: Dict[str, Any]) -> None: | |
"""Print summary for a single model.""" | |
success_rate = results["consistency_score"] * 100 | |
avg_time = results["avg_execution_time"] | |
stats = results["tool_calling_stats"] | |
status = "✅" if success_rate == 100 else "⚠️" if success_rate > 0 else "❌" | |
print(f"\n{status} {model_id}:") | |
print(f" Success Rate: {success_rate:.1f}% ({results['success_count']}/{results['total_runs']})") | |
print(f" Avg Time: {avg_time:.2f}s") | |
print(f" Avg Steps Completed: {stats['avg_steps_completed']:.1f}/2") | |
# Tool calling breakdown | |
print(f" Tool Calling Breakdown:") | |
print(f" get_current_time: {stats['get_current_time_calls']}/{results['total_runs']} calls") | |
print(f" calculator: {stats['calculator_calls']}/{results['total_runs']} calls") | |
print(f" step1 success: {stats['step1_success_count']}/{results['total_runs']}") | |
print(f" step2 success: {stats['step2_success_count']}/{results['total_runs']}") | |
print(f" calculation correct: {stats['calculation_correct_count']}/{results['total_runs']}") | |
# Show most common errors | |
if stats["common_errors"]: | |
print(f" Most Common Issues:") | |
sorted_errors = sorted(stats["common_errors"].items(), key=lambda x: x[1], reverse=True) | |
for error, count in sorted_errors[:3]: # Show top 3 errors | |
print(f" - {error}: {count} times") | |
def _print_overall_summary(self, all_results: Dict[str, Dict[str, Any]]) -> None: | |
"""Print overall summary across all models.""" | |
print(f"\n{'='*80}") | |
print("🎯 ORCHESTRATOR TESTING SUMMARY") | |
print(f"{'='*80}") | |
# Categorize models by performance | |
perfect_models = [] | |
partial_models = [] | |
failed_models = [] | |
for model_id, results in all_results.items(): | |
consistency = results["consistency_score"] | |
if consistency == 1.0: | |
perfect_models.append(model_id) | |
elif consistency > 0: | |
partial_models.append(model_id) | |
else: | |
failed_models.append(model_id) | |
print(f"\n✅ Perfect Orchestration (100% success): {len(perfect_models)} models") | |
for model in perfect_models: | |
avg_time = all_results[model]["avg_execution_time"] | |
print(f" - {model} (avg: {avg_time:.2f}s)") | |
print(f"\n⚠️ Partial Orchestration (>0% success): {len(partial_models)} models") | |
for model in partial_models: | |
success_rate = all_results[model]["consistency_score"] * 100 | |
avg_time = all_results[model]["avg_execution_time"] | |
print(f" - {model} ({success_rate:.1f}% success, avg: {avg_time:.2f}s)") | |
print(f"\n❌ Failed Orchestration (0% success): {len(failed_models)} models") | |
for model in failed_models: | |
print(f" - {model}") | |
# Overall statistics | |
total_models = len(all_results) | |
orchestration_capable = len(perfect_models) + len(partial_models) | |
# Aggregate tool calling statistics | |
total_runs = sum(r["total_runs"] for r in all_results.values()) | |
total_get_time_calls = sum(r["tool_calling_stats"]["get_current_time_calls"] for r in all_results.values()) | |
total_calc_calls = sum(r["tool_calling_stats"]["calculator_calls"] for r in all_results.values()) | |
total_step1_success = sum(r["tool_calling_stats"]["step1_success_count"] for r in all_results.values()) | |
total_step2_success = sum(r["tool_calling_stats"]["step2_success_count"] for r in all_results.values()) | |
print(f"\n📊 Overall Statistics:") | |
print(f" Total Models Tested: {total_models}") | |
print(f" Total Test Runs: {total_runs}") | |
print(f" Orchestration Capable: {orchestration_capable}/{total_models} ({orchestration_capable/total_models*100:.1f}%)") | |
print(f" Perfect Orchestration: {len(perfect_models)}/{total_models} ({len(perfect_models)/total_models*100:.1f}%)") | |
print(f"\n🔧 Tool Calling Statistics Across All Models:") | |
print(f" get_current_time calls: {total_get_time_calls}/{total_runs} ({total_get_time_calls/total_runs*100:.1f}%)") | |
print(f" calculator calls: {total_calc_calls}/{total_runs} ({total_calc_calls/total_runs*100:.1f}%)") | |
print(f" Step 1 success rate: {total_step1_success}/{total_runs} ({total_step1_success/total_runs*100:.1f}%)") | |
print(f" Step 2 success rate: {total_step2_success}/{total_runs} ({total_step2_success/total_runs*100:.1f}%)") | |
# Show models by tool calling capability | |
print(f"\n🎯 Models by Tool Calling Capability:") | |
for model_id, results in sorted(all_results.items(), key=lambda x: x[1]["consistency_score"], reverse=True): | |
stats = results["tool_calling_stats"] | |
get_time_rate = stats["get_current_time_calls"] / results["total_runs"] * 100 | |
calc_rate = stats["calculator_calls"] / results["total_runs"] * 100 | |
print(f" {model_id}: get_time={get_time_rate:.0f}%, calc={calc_rate:.0f}%, success={results['consistency_score']*100:.0f}%") | |
# Test functions | |
async def test_minimal_orchestrator(): | |
"""Test the minimal orchestrator with time query.""" | |
orchestrator = MinimalOrchestrator() | |
query = "Get the current time and multiply the minute number by the second number" | |
result = await orchestrator.execute_query(query) | |
print(f"\n🎯 Final Result: {result}") | |
async def test_all_models(): | |
"""Test orchestrator across all available models.""" | |
tester = OrchestratorModelTester(n_runs=3) | |
results = await tester.test_all_models() | |
return results | |
if __name__ == "__main__": | |
import asyncio | |
import sys | |
if len(sys.argv) > 1 and sys.argv[1] == "test-all": | |
print("🧪 Running comprehensive model testing...") | |
asyncio.run(test_all_models()) | |
else: | |
print("🧪 Running single orchestrator test...") | |
asyncio.run(test_minimal_orchestrator()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment