|
#!/usr/bin/env -S uv run --script |
|
# /// script |
|
# requires-python = ">=3.9" |
|
# dependencies = [ |
|
# "typer[all]", |
|
# "rich", |
|
# "redis", |
|
# "loguru", |
|
# "python-dotenv", |
|
# "psutil", |
|
# ] |
|
# /// |
|
|
|
""" |
|
Claude Dispatcher - Concurrent task dispatcher for Claude using tmux and Redis. |
|
|
|
This script demonstrates a robust concurrent task dispatcher that manages multiple |
|
Claude processes in isolated tmux sessions, using Redis for atomic slot management |
|
and self-healing job execution. |
|
|
|
Key features: |
|
- Atomic slot claiming using Redis SETNX |
|
- Self-healing jobs that clean up after themselves |
|
- Real-time monitoring of job status |
|
- Graceful handling of failures and timeouts |
|
- Force cleanup for stuck slots |
|
|
|
AGENT VERIFICATION INSTRUCTIONS: |
|
- Run this script with no arguments to execute working_usage() |
|
- The working_usage() function MUST complete successfully |
|
- This verifies the dispatcher can manage concurrent Claude tasks |
|
- DO NOT assume the script works without running it |
|
- Check that all assertions pass in working_usage() |
|
|
|
Third-party Documentation: |
|
- Redis Commands: https://redis.io/commands/ |
|
- tmux Manual: https://man7.org/linux/man-pages/man1/tmux.1.html |
|
- Claude CLI: https://docs.anthropic.com/en/docs/claude-cli |
|
|
|
Example Usage: |
|
# Basic setup and run |
|
uv run ./claude_dispatcher.py # Run working examples |
|
uv run ./claude_dispatcher.py validate # Check prerequisites |
|
uv run ./claude_dispatcher.py setup # Create tmux slots |
|
uv run ./claude_dispatcher.py run # Dispatch jobs |
|
uv run ./claude_dispatcher.py status # Monitor status |
|
|
|
Expected Results: |
|
- validate: All 6 prerequisite tests pass |
|
- setup: Creates 5 tmux sessions (claude_slot_1 through claude_slot_5) |
|
- run: Dispatches 6 jobs, with first 5 running immediately |
|
- status: Shows slot status with job information |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import asyncio |
|
import os |
|
import shlex |
|
import signal |
|
import subprocess |
|
import sys |
|
import time |
|
import psutil |
|
from datetime import datetime, timedelta |
|
from pathlib import Path |
|
from typing import List, Optional, Dict, Any, Tuple |
|
|
|
# Third-party imports |
|
import typer |
|
from rich.console import Console |
|
from rich.table import Table |
|
from rich.panel import Panel |
|
from rich.prompt import Prompt, Confirm |
|
import redis |
|
from loguru import logger |
|
from dotenv import load_dotenv, find_dotenv |
|
|
|
# Configure logging |
|
logger.remove() # Remove default handler |
|
logger.add( |
|
sys.stderr, |
|
level="INFO", |
|
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>" |
|
) |
|
|
|
# Load environment variables |
|
load_dotenv(find_dotenv()) |
|
|
|
console = Console() |
|
|
|
# --- Configuration --- |
|
PREFIX = "claude_slot" |
|
DEFAULT_SLOTS = 5 |
|
DEFAULT_TIMEOUT = 600 |
|
DEFAULT_CMD = "claude" |
|
REDIS_HOST = os.getenv("REDIS_HOST", "localhost") |
|
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) |
|
|
|
# Resource limits |
|
MAX_MEMORY_PERCENT = float(os.getenv("MAX_MEMORY_PERCENT", "80")) |
|
MAX_CPU_PERCENT = float(os.getenv("MAX_CPU_PERCENT", "80")) |
|
MIN_FREE_MEMORY_MB = int(os.getenv("MIN_FREE_MEMORY_MB", "500")) |
|
|
|
# Redis key TTLs (in seconds) |
|
SLOT_KEY_TTL = int(os.getenv("SLOT_KEY_TTL", "3600")) # 1 hour |
|
JOB_INFO_TTL = int(os.getenv("JOB_INFO_TTL", "7200")) # 2 hours |
|
|
|
# Health check intervals |
|
HEALTH_CHECK_INTERVAL = int(os.getenv("HEALTH_CHECK_INTERVAL", "60")) # 1 minute |
|
STUCK_SLOT_THRESHOLD = int(os.getenv("STUCK_SLOT_THRESHOLD", "1800")) # 30 minutes |
|
|
|
app = typer.Typer( |
|
no_args_is_help=True, |
|
help="A concurrent task dispatcher for Claude using tmux and Redis.", |
|
) |
|
|
|
# --- Global Client with Connection Retry --- |
|
class RedisClient: |
|
"""Redis client with automatic retry.""" |
|
|
|
def __init__(self): |
|
self._client = None |
|
self._connect() |
|
|
|
def _connect(self): |
|
"""Create Redis connection with retry logic.""" |
|
max_retries = 3 |
|
backoff = 0.5 |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
self._client = redis.Redis( |
|
host=REDIS_HOST, |
|
port=REDIS_PORT, |
|
decode_responses=True, |
|
socket_connect_timeout=5, |
|
socket_timeout=5, |
|
) |
|
self._client.ping() |
|
logger.success(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}") |
|
return |
|
except redis.RedisError as e: |
|
if attempt == max_retries - 1: |
|
console.print(f"[bold red]✗ Fatal:[/bold red] Could not connect to Redis after {max_retries} attempts. {e}") |
|
raise typer.Exit(code=1) |
|
logger.warning(f"Redis connection attempt {attempt + 1} failed, retrying in {backoff}s...") |
|
time.sleep(backoff) |
|
backoff *= 2 |
|
|
|
def execute_with_retry(self, func, *args, **kwargs): |
|
"""Execute Redis command with automatic retry on connection failure.""" |
|
max_retries = 3 |
|
backoff = 0.1 |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
return func(*args, **kwargs) |
|
except (redis.ConnectionError, redis.TimeoutError) as e: |
|
if attempt == max_retries - 1: |
|
logger.error(f"Redis operation failed after {max_retries} attempts: {e}") |
|
raise |
|
logger.warning(f"Redis operation failed, retrying in {backoff}s (attempt {attempt + 1}/{max_retries})") |
|
time.sleep(backoff) |
|
backoff *= 2 |
|
# Try to reconnect |
|
try: |
|
self._connect() |
|
except: |
|
pass |
|
|
|
def __getattr__(self, name): |
|
"""Proxy all Redis commands through retry logic.""" |
|
client_attr = getattr(self._client, name) |
|
if callable(client_attr): |
|
return lambda *args, **kwargs: self.execute_with_retry(client_attr, *args, **kwargs) |
|
return client_attr |
|
|
|
# Initialize Redis client |
|
try: |
|
redis_client = RedisClient() |
|
except Exception as e: |
|
console.print(f"[bold red]✗ Fatal:[/bold red] Could not initialize Redis client. {e}") |
|
sys.exit(1) |
|
|
|
# --- Interrupt Handler --- |
|
_shutdown_event = asyncio.Event() |
|
|
|
def signal_handler(sig, frame): |
|
"""Handle interrupt signals gracefully.""" |
|
logger.warning(f"Received signal {sig}, initiating graceful shutdown...") |
|
_shutdown_event.set() |
|
|
|
# Register signal handlers |
|
signal.signal(signal.SIGINT, signal_handler) |
|
signal.signal(signal.SIGTERM, signal_handler) |
|
|
|
# ------------------------------------------------------------------ # |
|
# Core Helpers |
|
# ------------------------------------------------------------------ # |
|
def _slot_key(n: int) -> str: |
|
"""Returns the Redis key for a given slot.""" |
|
return f"{PREFIX}:{n}" |
|
|
|
def _session_name(n: int) -> str: |
|
"""Returns the tmux session name for a given slot.""" |
|
return f"{PREFIX}_{n}" |
|
|
|
def _pane(n: int) -> str: |
|
"""Returns the tmux pane identifier for a given slot.""" |
|
return _session_name(n) |
|
|
|
def _job_info_key(job_id: int) -> str: |
|
"""Returns the Redis key for job information.""" |
|
return f"{PREFIX}:job:{job_id}" |
|
|
|
async def _store_job_info(job_id: int, prompt: str, slot: int) -> None: |
|
"""Store job information in Redis with TTL.""" |
|
info = { |
|
"prompt": prompt[:100] + "..." if len(prompt) > 100 else prompt, |
|
"slot": slot, |
|
"start_time": datetime.now().isoformat(), |
|
"status": "running" |
|
} |
|
redis_client.hset(_job_info_key(job_id), mapping=info) |
|
redis_client.expire(_job_info_key(job_id), JOB_INFO_TTL) |
|
|
|
async def _get_slot_status(n: int) -> Dict[str, Any]: |
|
"""Get the current status of a slot with health info.""" |
|
slot_key = _slot_key(n) |
|
value = redis_client.get(slot_key) |
|
|
|
status = {"slot": n, "status": "idle", "job_id": None, "job_info": None, "health": "unknown"} |
|
|
|
if value: |
|
status["status"] = "busy" |
|
if value.startswith("busy:job_"): |
|
job_id = int(value.split("_")[1]) |
|
status["job_id"] = job_id |
|
job_info = redis_client.hgetall(_job_info_key(job_id)) |
|
if job_info: |
|
status["job_info"] = job_info |
|
# Check if slot is stuck |
|
start_time = datetime.fromisoformat(job_info.get("start_time", datetime.now().isoformat())) |
|
duration = (datetime.now() - start_time).total_seconds() |
|
if duration > STUCK_SLOT_THRESHOLD: |
|
status["health"] = "stuck" |
|
else: |
|
status["health"] = "healthy" |
|
|
|
# Check if tmux session is actually running |
|
try: |
|
proc = await asyncio.create_subprocess_exec( |
|
"tmux", "has-session", "-t", _session_name(n), |
|
stdout=asyncio.subprocess.DEVNULL, |
|
stderr=asyncio.subprocess.DEVNULL |
|
) |
|
tmux_exists = await proc.wait() == 0 |
|
|
|
if not tmux_exists and status["status"] == "busy": |
|
status["health"] = "orphaned" |
|
|
|
# Get last output if session exists |
|
if tmux_exists: |
|
proc = await asyncio.create_subprocess_exec( |
|
"tmux", "capture-pane", "-t", _pane(n), "-p", |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.DEVNULL |
|
) |
|
stdout, _ = await proc.communicate() |
|
if stdout: |
|
last_line = stdout.decode().strip().split('\n')[-1] if stdout else "" |
|
status["last_output"] = last_line[:80] + "..." if len(last_line) > 80 else last_line |
|
except: |
|
status["last_output"] = "N/A" |
|
|
|
return status |
|
|
|
def check_system_resources() -> Tuple[bool, str]: |
|
"""Check if system has sufficient resources.""" |
|
try: |
|
# Check memory |
|
memory = psutil.virtual_memory() |
|
memory_percent = memory.percent |
|
free_memory_mb = memory.available / (1024 * 1024) |
|
|
|
if memory_percent > MAX_MEMORY_PERCENT: |
|
return False, f"Memory usage too high: {memory_percent:.1f}% (limit: {MAX_MEMORY_PERCENT}%)" |
|
|
|
if free_memory_mb < MIN_FREE_MEMORY_MB: |
|
return False, f"Insufficient free memory: {free_memory_mb:.0f}MB (minimum: {MIN_FREE_MEMORY_MB}MB)" |
|
|
|
# Check CPU |
|
cpu_percent = psutil.cpu_percent(interval=0.1) |
|
if cpu_percent > MAX_CPU_PERCENT: |
|
return False, f"CPU usage too high: {cpu_percent:.1f}% (limit: {MAX_CPU_PERCENT}%)" |
|
|
|
return True, f"Resources OK - Memory: {memory_percent:.1f}%, CPU: {cpu_percent:.1f}%" |
|
|
|
except Exception as e: |
|
logger.error(f"Error checking system resources: {e}") |
|
return True, "Resource check failed, proceeding anyway" |
|
|
|
def cleanup_zombie_processes(): |
|
"""Clean up any zombie processes.""" |
|
try: |
|
zombies = [] |
|
for proc in psutil.process_iter(['pid', 'name', 'status']): |
|
if proc.info['status'] == psutil.STATUS_ZOMBIE: |
|
zombies.append(proc.info['pid']) |
|
|
|
if zombies: |
|
logger.warning(f"Found {len(zombies)} zombie processes, attempting cleanup") |
|
for pid in zombies: |
|
try: |
|
os.waitpid(pid, os.WNOHANG) |
|
except: |
|
pass |
|
|
|
except Exception as e: |
|
logger.error(f"Error cleaning up zombie processes: {e}") |
|
|
|
async def _try_claim_and_launch( |
|
n: int, job_id: int, prompt: str, timeout: int, claude_bin: str |
|
) -> bool: |
|
"""Atomically tries to claim a slot in Redis and launch a self-cleaning job.""" |
|
# Check resources before claiming |
|
resources_ok, resource_msg = check_system_resources() |
|
if not resources_ok: |
|
logger.warning(f"Resource check failed for job {job_id}: {resource_msg}") |
|
return False |
|
|
|
slot_key = _slot_key(n) |
|
# Use SET with NX and EX for atomic claim with TTL |
|
if not redis_client.set(slot_key, f"busy:job_{job_id}", nx=True, ex=SLOT_KEY_TTL): |
|
return False # Slot was already claimed. |
|
|
|
try: |
|
logger.info(f"Job #{job_id} claiming slot {n}") |
|
await _store_job_info(job_id, prompt, n) |
|
|
|
# FIXED: Use trap to ensure cleanup happens even on interrupts |
|
claude_cmd = f"{claude_bin} -p {shlex.quote(prompt)} --output-format stream-json" |
|
cleanup_cmd = f"redis-cli -h {REDIS_HOST} -p {REDIS_PORT} DEL {slot_key} {_job_info_key(job_id)}" |
|
|
|
# Use trap to ensure cleanup runs on EXIT, INT, TERM |
|
# Don't use exec to avoid killing the tmux pane |
|
full_cmd = f"""bash -c 'trap "{cleanup_cmd}" EXIT INT TERM; timeout {timeout} {claude_cmd}; exit_code=$?; {cleanup_cmd}; exit $exit_code'""" |
|
|
|
proc = await asyncio.create_subprocess_exec( |
|
"tmux", "send-keys", "-t", _pane(n), full_cmd, "Enter", |
|
stderr=asyncio.subprocess.PIPE, |
|
) |
|
# We don't need to read output, just ensure the send-keys command succeeded. |
|
await proc.wait() |
|
|
|
if proc.returncode != 0: |
|
stderr = (await proc.stderr.read()).decode().strip() |
|
console.print(f"[red]✗ Launch Error ({_pane(n)}):[/red] {stderr}") |
|
logger.error(f"tmux send-keys failed for slot {n}: {stderr}") |
|
raise RuntimeError(f"tmux send-keys failed: {stderr}") |
|
|
|
console.print(f"[green]✓[/green] Job #{job_id} launched into slot {n}.") |
|
logger.success(f"Job #{job_id} successfully launched in slot {n}") |
|
return True |
|
except Exception as e: |
|
console.print(f"[bold red]✗ Critical launch failure in slot {n}:[/bold red] {e}") |
|
logger.error(f"Critical failure launching job {job_id} in slot {n}: {e}") |
|
redis_client.delete(slot_key) # Clean up the claim on failure. |
|
redis_client.delete(_job_info_key(job_id)) |
|
return False |
|
|
|
async def _dispatch_one_job( |
|
job_id: int, prompt: str, timeout: int, claude_bin: str, max_slots: int |
|
) -> None: |
|
"""A worker that polls for an idle slot and launches a single job.""" |
|
console.print(f"[dim]Job #{job_id} ('{prompt[:30]}...') waiting for slot...[/dim]") |
|
logger.info(f"Job #{job_id} starting dispatch") |
|
backoff = 0.25 |
|
attempts = 0 |
|
while not _shutdown_event.is_set(): |
|
attempts += 1 |
|
|
|
# Clean up zombies periodically |
|
if attempts % 20 == 0: |
|
cleanup_zombie_processes() |
|
|
|
for slot_num in range(1, max_slots + 1): |
|
if await _try_claim_and_launch(slot_num, job_id, prompt, timeout, claude_bin): |
|
return |
|
|
|
# Check for shutdown before sleeping |
|
if _shutdown_event.is_set(): |
|
logger.info(f"Job #{job_id} cancelled due to shutdown") |
|
return |
|
|
|
try: |
|
await asyncio.wait_for( |
|
_shutdown_event.wait(), |
|
timeout=backoff |
|
) |
|
# If we reach here, shutdown was requested |
|
logger.info(f"Job #{job_id} cancelled due to shutdown") |
|
return |
|
except asyncio.TimeoutError: |
|
# Normal timeout, continue polling |
|
pass |
|
|
|
backoff = min(backoff * 1.5, 5.0) # Exponential backoff up to 5s. |
|
if attempts % 10 == 0: |
|
logger.debug(f"Job #{job_id} still waiting after {attempts} attempts") |
|
|
|
# ------------------------------------------------------------------ # |
|
# Validation Helpers |
|
# ------------------------------------------------------------------ # |
|
async def _validate_dependencies(max_slots: int, claude_bin: str): |
|
"""Runs all pre-flight checks concurrently.""" |
|
console.print("[dim]Running pre-flight checks...[/dim]") |
|
logger.info("Starting dependency validation") |
|
|
|
async def _check_tmux_session(n: int) -> bool: |
|
proc = await asyncio.create_subprocess_exec("tmux", "has-session", "-t", _session_name(n)) |
|
return await proc.wait() == 0 |
|
|
|
async def _check_claude_bin() -> bool: |
|
try: |
|
# First check if the binary exists and is executable |
|
proc = await asyncio.create_subprocess_exec( |
|
"which", claude_bin, |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.DEVNULL |
|
) |
|
await proc.wait() |
|
if proc.returncode != 0: |
|
return False |
|
|
|
# Then try to run it with --version |
|
proc = await asyncio.create_subprocess_exec( |
|
claude_bin, "--version", |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.PIPE |
|
) |
|
await proc.wait() |
|
return proc.returncode == 0 |
|
except FileNotFoundError: |
|
return False |
|
|
|
claude_ok_task = asyncio.create_task(_check_claude_bin()) |
|
tmux_ok_tasks = [_check_tmux_session(n) for n in range(1, max_slots + 1)] |
|
|
|
results = await asyncio.gather(claude_ok_task, *tmux_ok_tasks) |
|
claude_ok, *tmux_sessions_ok = results |
|
|
|
if not claude_ok: |
|
console.print(f"[red]✗ Prerequisite failed:[/red] `{claude_bin}` not found or not executable.") |
|
console.print("[yellow]Hint:[/yellow] Make sure 'claude' is installed and in your PATH, or use --claude to specify the path") |
|
raise typer.Exit(code=1) |
|
|
|
missing_sessions = [] |
|
for n, exists in enumerate(tmux_sessions_ok, 1): |
|
if not exists: |
|
missing_sessions.append(n) |
|
|
|
if missing_sessions: |
|
console.print(f"[red]✗ Missing tmux sessions:[/red] {', '.join(f'{PREFIX}_{n}' for n in missing_sessions)}") |
|
console.print("[yellow]Hint:[/yellow] Run `uv run ./claude_dispatcher.py setup` first") |
|
raise typer.Exit(code=1) |
|
|
|
console.print("[green]✓[/green] All checks passed.") |
|
logger.success("All dependencies validated successfully") |
|
|
|
# ------------------------------------------------------------------ # |
|
# Health Command |
|
# ------------------------------------------------------------------ # |
|
@app.command() |
|
def health( |
|
slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of slots to check."), |
|
): |
|
"""Run health checks and show system status.""" |
|
async def _run_health_check(): |
|
console.print("\n[bold cyan]System Health Check[/bold cyan]\n") |
|
|
|
# Check Redis |
|
try: |
|
redis_client.ping() |
|
console.print("Redis: ✅ Connected") |
|
except Exception as e: |
|
console.print(f"Redis: ❌ Error - {e}") |
|
|
|
# Check resources |
|
resources_ok, resource_msg = check_system_resources() |
|
status_emoji = "✅" if resources_ok else "❌" |
|
console.print(f"Resources: {status_emoji} {resource_msg}") |
|
|
|
# Check tmux sessions |
|
healthy_sessions = 0 |
|
for n in range(1, slots + 1): |
|
proc = await asyncio.create_subprocess_exec( |
|
"tmux", "has-session", "-t", _session_name(n), |
|
stdout=asyncio.subprocess.DEVNULL, |
|
stderr=asyncio.subprocess.DEVNULL |
|
) |
|
if await proc.wait() == 0: |
|
healthy_sessions += 1 |
|
|
|
console.print(f"Tmux Sessions: {healthy_sessions}/{slots} active") |
|
|
|
# Check slots |
|
table = Table(title="Slot Health Status") |
|
table.add_column("Slot", style="cyan") |
|
table.add_column("Status", style="green") |
|
table.add_column("Health", style="yellow") |
|
table.add_column("Job ID", style="blue") |
|
table.add_column("Duration", style="magenta") |
|
|
|
stuck_count = 0 |
|
orphaned_count = 0 |
|
|
|
for n in range(1, slots + 1): |
|
status = await _get_slot_status(n) |
|
|
|
if status["health"] == "stuck": |
|
stuck_count += 1 |
|
elif status["health"] == "orphaned": |
|
orphaned_count += 1 |
|
|
|
health_emoji = { |
|
"healthy": "✅", |
|
"stuck": "🔴", |
|
"orphaned": "⚠️", |
|
"unknown": "❓" |
|
}.get(status["health"], "❓") |
|
|
|
duration = "-" |
|
if status.get("job_info"): |
|
start_time = datetime.fromisoformat(status["job_info"].get("start_time", datetime.now().isoformat())) |
|
duration = f"{(datetime.now() - start_time).total_seconds():.0f}s" |
|
|
|
table.add_row( |
|
str(n), |
|
status["status"], |
|
f"{health_emoji} {status['health']}", |
|
str(status.get("job_id", "-")), |
|
duration |
|
) |
|
|
|
console.print(table) |
|
|
|
if stuck_count > 0: |
|
console.print(f"\n⚠️ [yellow]Warning: {stuck_count} stuck slots detected[/yellow]") |
|
console.print("Run 'cleanup --force' to recover stuck slots") |
|
|
|
if orphaned_count > 0: |
|
console.print(f"\n⚠️ [yellow]Warning: {orphaned_count} orphaned slots detected[/yellow]") |
|
|
|
asyncio.run(_run_health_check()) |
|
|
|
# ------------------------------------------------------------------ # |
|
# Validation Command |
|
# ------------------------------------------------------------------ # |
|
@app.command() |
|
def validate(): |
|
"""Validate all prerequisites are installed and configured correctly.""" |
|
console.print("\n[bold cyan]Claude Dispatcher Setup Validation[/bold cyan]\n") |
|
|
|
test_results = [] |
|
|
|
def add_result(component: str, test: str, passed: bool, message: str = ""): |
|
"""Add a test result.""" |
|
test_results.append({ |
|
"component": component, |
|
"test": test, |
|
"passed": passed, |
|
"message": message |
|
}) |
|
|
|
status = "[green]✓ PASS[/green]" if passed else "[red]✗ FAIL[/red]" |
|
console.print(f"{status} {component}: {test} {message}") |
|
|
|
# Python version |
|
version = sys.version_info |
|
passed = version >= (3, 9) |
|
message = f"(Python {version.major}.{version.minor}.{version.micro})" |
|
add_result("Python", "Version >= 3.9", passed, message) |
|
|
|
# Required packages |
|
try: |
|
import psutil |
|
add_result("psutil", "Installation", True, f"(v{psutil.__version__})") |
|
except ImportError: |
|
add_result("psutil", "Installation", False, "(Not installed)") |
|
|
|
# uv installed |
|
try: |
|
result = subprocess.run(["uv", "--version"], capture_output=True, text=True) |
|
passed = result.returncode == 0 |
|
version_str = result.stdout.strip() if passed else "Not found" |
|
add_result("uv", "Installation", passed, f"({version_str})") |
|
except FileNotFoundError: |
|
add_result("uv", "Installation", False, "(Not found in PATH)") |
|
|
|
# tmux installed |
|
try: |
|
result = subprocess.run(["tmux", "-V"], capture_output=True, text=True) |
|
passed = result.returncode == 0 |
|
version_str = result.stdout.strip() if passed else "Not found" |
|
add_result("tmux", "Installation", passed, f"({version_str})") |
|
except FileNotFoundError: |
|
add_result("tmux", "Installation", False, "(Not found in PATH)") |
|
|
|
# Redis server |
|
try: |
|
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) |
|
r.ping() |
|
|
|
# Test basic operations |
|
test_key = "claude_dispatcher_test" |
|
r.set(test_key, "test_value") |
|
value = r.get(test_key) |
|
r.delete(test_key) |
|
|
|
passed = value == "test_value" |
|
add_result("Redis", "Server connection", passed, f"({REDIS_HOST}:{REDIS_PORT})") |
|
except redis.RedisError as e: |
|
add_result("Redis", "Server connection", False, f"({str(e)})") |
|
|
|
# redis-cli |
|
try: |
|
result = subprocess.run(["redis-cli", "--version"], capture_output=True, text=True) |
|
passed = result.returncode == 0 |
|
version_str = result.stdout.strip().split()[1] if passed else "Not found" |
|
add_result("redis-cli", "Installation", passed, f"(v{version_str})") |
|
except FileNotFoundError: |
|
add_result("redis-cli", "Installation", False, "(Not found in PATH)") |
|
|
|
# claude binary |
|
try: |
|
result = subprocess.run(["which", "claude"], capture_output=True, text=True) |
|
found = result.returncode == 0 |
|
|
|
if not found: |
|
add_result("claude", "Binary in PATH", False, "(Not found)") |
|
else: |
|
result = subprocess.run(["claude", "--version"], capture_output=True, text=True) |
|
if result.returncode == 0: |
|
version_str = result.stdout.strip() |
|
add_result("claude", "Binary executable", True, f"({version_str})") |
|
else: |
|
error = result.stderr.strip() |
|
add_result("claude", "Binary executable", False, f"(Error: {error[:50]}...)") |
|
except FileNotFoundError: |
|
add_result("claude", "Binary in PATH", False, "(which command not found)") |
|
|
|
# Summary table |
|
console.print("\n[bold]Test Summary[/bold]") |
|
table = Table() |
|
table.add_column("Component", style="cyan") |
|
table.add_column("Test", style="white") |
|
table.add_column("Result", style="white") |
|
table.add_column("Details", style="dim") |
|
|
|
passed_count = 0 |
|
total_count = len(test_results) |
|
|
|
for result in test_results: |
|
status = "[green]PASS[/green]" if result["passed"] else "[red]FAIL[/red]" |
|
table.add_row( |
|
result["component"], |
|
result["test"], |
|
status, |
|
result["message"] |
|
) |
|
if result["passed"]: |
|
passed_count += 1 |
|
|
|
console.print(table) |
|
|
|
# Overall result |
|
console.print(f"\n[bold]Overall: {passed_count}/{total_count} tests passed[/bold]") |
|
|
|
if passed_count == total_count: |
|
console.print("\n[bold green]✓ All tests passed! System is ready.[/bold green]") |
|
console.print("\nNext steps:") |
|
console.print("1. Run: [cyan]uv run ./claude_dispatcher.py setup[/cyan]") |
|
console.print("2. Run: [cyan]uv run ./claude_dispatcher.py run[/cyan]") |
|
else: |
|
console.print("\n[bold red]✗ Some tests failed. Please fix the issues above.[/bold red]") |
|
|
|
# Provide hints for common issues |
|
console.print("\n[yellow]Common fixes:[/yellow]") |
|
if not any(r["component"] == "Redis" and r["passed"] for r in test_results): |
|
console.print("- Start Redis: [cyan]redis-server[/cyan]") |
|
if not any(r["component"] == "uv" and r["passed"] for r in test_results): |
|
console.print("- Install uv: [cyan]curl -LsSf https://astral.sh/uv/install.sh | sh[/cyan]") |
|
if not any(r["component"] == "claude" and r["passed"] for r in test_results): |
|
console.print("- Install Claude CLI or update PATH") |
|
|
|
raise typer.Exit(code=1) |
|
|
|
# ------------------------------------------------------------------ # |
|
# CLI Commands |
|
# ------------------------------------------------------------------ # |
|
@app.command() |
|
def setup( |
|
slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of tmux slots to create."), |
|
): |
|
"""Creates (or verifies) tmux sessions and cleans their Redis state.""" |
|
async def _async_setup(): |
|
console.print(f"Setting up {slots} tmux slots...") |
|
logger.info(f"Setting up {slots} tmux slots") |
|
|
|
created = 0 |
|
existing = 0 |
|
|
|
for n in range(1, slots + 1): |
|
session = _session_name(n) |
|
pane = _pane(n) |
|
# Use `has-session` to make setup idempotent and non-destructive. |
|
proc = await asyncio.create_subprocess_exec( |
|
"tmux", "has-session", "-t", session, |
|
stdout=asyncio.subprocess.DEVNULL, |
|
stderr=asyncio.subprocess.DEVNULL |
|
) |
|
if await proc.wait() != 0: |
|
proc = await asyncio.create_subprocess_exec( |
|
"tmux", "new-session", "-d", "-s", session, |
|
stdout=asyncio.subprocess.DEVNULL, |
|
stderr=asyncio.subprocess.PIPE |
|
) |
|
if await proc.wait() != 0: |
|
if proc.stderr: |
|
stderr = await proc.stderr.read() |
|
console.print(f"[red]✗ Failed to create session {session}: {stderr.decode()}") |
|
continue |
|
|
|
# Don't use exec to keep the shell alive |
|
proc = await asyncio.create_subprocess_exec( |
|
"tmux", "send-keys", "-t", session, "echo 'Slot ready'", "Enter", |
|
stdout=asyncio.subprocess.DEVNULL, |
|
stderr=asyncio.subprocess.PIPE |
|
) |
|
if await proc.wait() != 0: |
|
if proc.stderr: |
|
stderr = await proc.stderr.read() |
|
console.print(f"[red]✗ Failed to initialize session {session}: {stderr.decode()}") |
|
continue |
|
|
|
console.print(f"[green]✓[/green] Session `{session}` created.") |
|
created += 1 |
|
else: |
|
console.print(f"[yellow]→[/yellow] Session `{session}` already exists.") |
|
existing += 1 |
|
# Always reset the Redis key to ensure a clean state. |
|
redis_client.delete(_slot_key(n)) |
|
|
|
console.print(f"[bold green]✓ Setup complete.[/bold green] Created: {created}, Existing: {existing}, All slots are idle.") |
|
logger.success(f"Setup complete: {created} created, {existing} existing") |
|
|
|
asyncio.run(_async_setup()) |
|
|
|
@app.command() |
|
def status( |
|
slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of slots to check."), |
|
watch: bool = typer.Option(False, "--watch", "-w", help="Continuously watch status."), |
|
): |
|
"""Show the current status of all slots.""" |
|
async def _show_status(): |
|
while True: |
|
table = Table(title=f"Claude Dispatcher Status - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
table.add_column("Slot", style="cyan") |
|
table.add_column("Status", style="green") |
|
table.add_column("Job ID", style="yellow") |
|
table.add_column("Prompt", style="blue") |
|
table.add_column("Start Time", style="magenta") |
|
table.add_column("Last Output", style="dim") |
|
|
|
statuses = await asyncio.gather(*[_get_slot_status(n) for n in range(1, slots + 1)]) |
|
|
|
for status in statuses: |
|
slot_num = status["slot"] |
|
state = status["status"] |
|
job_id = status.get("job_id", "-") |
|
last_output = status.get("last_output", "") |
|
|
|
if status.get("job_info"): |
|
info = status["job_info"] |
|
prompt = info.get("prompt", "N/A") |
|
start_time = info.get("start_time", "N/A") |
|
if start_time != "N/A": |
|
start_dt = datetime.fromisoformat(start_time) |
|
start_time = start_dt.strftime("%H:%M:%S") |
|
else: |
|
prompt = "-" |
|
start_time = "-" |
|
|
|
status_emoji = "🟢" if state == "idle" else "🔴" |
|
table.add_row( |
|
f"{slot_num}", |
|
f"{status_emoji} {state}", |
|
str(job_id) if job_id else "-", |
|
prompt, |
|
start_time, |
|
last_output |
|
) |
|
|
|
console.clear() |
|
console.print(table) |
|
|
|
if not watch: |
|
break |
|
|
|
await asyncio.sleep(2) |
|
|
|
asyncio.run(_show_status()) |
|
|
|
@app.command() |
|
def cleanup( |
|
slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of slots to clean."), |
|
force: bool = typer.Option(False, "--force", "-f", help="Force cleanup without confirmation."), |
|
kill_stuck: bool = typer.Option(False, "--kill-stuck", help="Kill stuck tmux sessions."), |
|
): |
|
"""Force cleanup of all Redis locks and handle stuck/orphaned slots.""" |
|
async def _async_cleanup(): |
|
if not force: |
|
confirm = typer.confirm("This will clear all slot locks. Continue?") |
|
if not confirm: |
|
console.print("[yellow]Cleanup cancelled.[/yellow]") |
|
return |
|
|
|
console.print(f"Cleaning up {slots} slots...") |
|
cleared = 0 |
|
stuck_killed = 0 |
|
orphaned_cleaned = 0 |
|
|
|
for n in range(1, slots + 1): |
|
slot_key = _slot_key(n) |
|
status = await _get_slot_status(n) |
|
|
|
# Handle stuck slots |
|
if kill_stuck and status["health"] == "stuck": |
|
logger.info(f"Killing stuck tmux session for slot {n}") |
|
subprocess.run( |
|
["tmux", "kill-pane", "-t", _pane(n)], |
|
capture_output=True |
|
) |
|
stuck_killed += 1 |
|
|
|
# Handle orphaned slots |
|
if status["health"] == "orphaned": |
|
logger.info(f"Cleaning orphaned slot {n}") |
|
orphaned_cleaned += 1 |
|
|
|
if redis_client.exists(slot_key): |
|
value = redis_client.get(slot_key) |
|
redis_client.delete(slot_key) |
|
console.print(f"[green]✓[/green] Cleared slot {n} (was: {value})") |
|
cleared += 1 |
|
|
|
# Also clear associated job info |
|
if value and value.startswith("busy:job_"): |
|
job_id = int(value.split("_")[1]) |
|
redis_client.delete(_job_info_key(job_id)) |
|
else: |
|
console.print(f"[dim]Slot {n} was already idle[/dim]") |
|
|
|
# Clean up all old job info keys |
|
pattern = f"{PREFIX}:job:*" |
|
job_keys = list(redis_client.scan_iter(match=pattern)) |
|
if job_keys: |
|
redis_client.delete(*job_keys) |
|
console.print(f"[green]✓[/green] Cleaned up {len(job_keys)} job info keys") |
|
|
|
console.print(f"[bold green]✓ Cleanup complete.[/bold green]") |
|
console.print(f" Cleared: {cleared} slots") |
|
if kill_stuck: |
|
console.print(f" Killed: {stuck_killed} stuck sessions") |
|
if orphaned_cleaned > 0: |
|
console.print(f" Cleaned: {orphaned_cleaned} orphaned slots") |
|
|
|
logger.info(f"Cleanup complete: cleared {cleared} slots, killed {stuck_killed} stuck, cleaned {orphaned_cleaned} orphaned") |
|
|
|
asyncio.run(_async_cleanup()) |
|
|
|
@app.command() |
|
def run( |
|
prompts: Optional[List[str]] = typer.Argument(None, help="Prompts to dispatch. Uses built-ins if omitted."), |
|
timeout: int = typer.Option(DEFAULT_TIMEOUT, "--timeout", "-t", help="Timeout in seconds for each job."), |
|
claude_bin: str = typer.Option(DEFAULT_CMD, "--claude", help="Path to the Claude binary."), |
|
max_slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of slots to use."), |
|
): |
|
"""Dispatches prompts into the tmux slot pool concurrently.""" |
|
# This wrapper is the key fix for using async functions in a Typer command. |
|
async def _async_run(): |
|
await _validate_dependencies(max_slots, claude_bin) |
|
|
|
prompt_list = prompts |
|
if not prompt_list: |
|
console.print("[dim]Using built-in example prompts...[/dim]") |
|
prompt_list = [ |
|
"What is 2 + 2?", |
|
"Explain asyncio vs threading in Python.", |
|
"Write a minimal async TCP echo server with tests and CI.", |
|
"Convert 42 °C to °F.", |
|
"Summarize 'Dune' in 150 words.", |
|
"Create a Flask API with JWT, SQLAlchemy, Docker, and OpenAPI.", |
|
] |
|
|
|
logger.info(f"Dispatching {len(prompt_list)} jobs across {max_slots} slots") |
|
start_time = datetime.now() |
|
|
|
tasks = [ |
|
asyncio.create_task(_dispatch_one_job(i, p, timeout, claude_bin, max_slots)) |
|
for i, p in enumerate(prompt_list, 1) |
|
] |
|
|
|
try: |
|
# Wait for all tasks with interrupt handling |
|
done, pending = await asyncio.wait( |
|
tasks, |
|
return_when=asyncio.FIRST_EXCEPTION |
|
) |
|
|
|
# Check if any task raised an exception |
|
for task in done: |
|
if task.exception(): |
|
raise task.exception() |
|
|
|
# If shutdown was requested, cancel pending tasks |
|
if _shutdown_event.is_set(): |
|
console.print("\n[yellow]Gracefully cancelling pending jobs...[/yellow]") |
|
for task in pending: |
|
task.cancel() |
|
await asyncio.gather(*pending, return_exceptions=True) |
|
console.print("[yellow]Shutdown complete.[/yellow]") |
|
return |
|
|
|
# Wait for remaining tasks |
|
if pending: |
|
await asyncio.gather(*pending) |
|
|
|
except KeyboardInterrupt: |
|
console.print("\n[yellow]Received interrupt, shutting down gracefully...[/yellow]") |
|
_shutdown_event.set() |
|
|
|
# Cancel all pending tasks |
|
for task in tasks: |
|
if not task.done(): |
|
task.cancel() |
|
|
|
# Wait for cancellation to complete |
|
await asyncio.gather(*tasks, return_exceptions=True) |
|
console.print("[yellow]Shutdown complete.[/yellow]") |
|
return |
|
|
|
elapsed = (datetime.now() - start_time).total_seconds() |
|
console.print(f"[bold green]✓ All jobs have been dispatched.[/bold green] Total time: {elapsed:.2f}s") |
|
logger.success(f"All {len(prompt_list)} jobs dispatched in {elapsed:.2f}s") |
|
|
|
try: |
|
asyncio.run(_async_run()) |
|
except KeyboardInterrupt: |
|
console.print("\n[yellow]Interrupted by user.[/yellow]") |
|
sys.exit(0) |
|
|
|
# ------------------------------------------------------------------ # |
|
# Demo Commands |
|
# ------------------------------------------------------------------ # |
|
def print_section(title: str, content: str = None): |
|
"""Print a formatted section.""" |
|
if content: |
|
console.print(Panel(content, title=f"[bold cyan]{title}[/bold cyan]", border_style="cyan")) |
|
else: |
|
console.print(f"\n[bold cyan]{'='*60}[/bold cyan]") |
|
console.print(f"[bold cyan]{title}[/bold cyan]") |
|
console.print(f"[bold cyan]{'='*60}[/bold cyan]\n") |
|
|
|
async def run_demo_command(cmd: str, title: str = None): |
|
"""Run a command and show output.""" |
|
if title: |
|
console.print(f"\n[yellow]→ {title}[/yellow]") |
|
console.print(f"[dim]$ {cmd}[/dim]") |
|
|
|
proc = await asyncio.create_subprocess_shell( |
|
cmd, |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.STDOUT |
|
) |
|
|
|
while True: |
|
line = await proc.stdout.readline() |
|
if not line: |
|
break |
|
console.print(line.decode().rstrip()) |
|
|
|
await proc.wait() |
|
return proc.returncode == 0 |
|
|
|
@app.command() |
|
def demo(): |
|
"""Run interactive demo scenarios to showcase the dispatcher.""" |
|
print_section("Claude Dispatcher Interactive Demo") |
|
|
|
scenarios = { |
|
"1": ("Basic Setup and Run", demo_basic), |
|
"2": ("Job Queueing (More Jobs than Slots)", demo_queueing), |
|
"3": ("Cleanup and Recovery", demo_recovery), |
|
"4": ("Advanced Configuration", demo_advanced), |
|
} |
|
|
|
while True: |
|
console.print("\n[bold]Available Demos:[/bold]") |
|
for key, (name, _) in scenarios.items(): |
|
console.print(f"{key}. {name}") |
|
console.print("5. Exit") |
|
|
|
choice = Prompt.ask("\nSelect a demo", choices=["1", "2", "3", "4", "5"]) |
|
|
|
if choice == "5": |
|
console.print("\n[green]Thanks for trying the Claude Dispatcher demo![/green]") |
|
break |
|
|
|
if choice in scenarios: |
|
name, func = scenarios[choice] |
|
print_section(f"Scenario {choice}: {name}") |
|
func() |
|
|
|
if not Confirm.ask("\nWould you like to try another demo?"): |
|
break |
|
|
|
def demo_basic(): |
|
"""Basic Demo: Setup and Simple Run""" |
|
async def run(): |
|
print_section("Step 1: Validate Prerequisites", |
|
"First, let's make sure everything is installed correctly.") |
|
|
|
if not await run_demo_command("uv run ./claude_dispatcher.py validate", "Running setup validation"): |
|
console.print("[red]Prerequisites check failed! Please fix issues before continuing.[/red]") |
|
return |
|
|
|
input("\nPress Enter to continue...") |
|
|
|
# Setup slots |
|
print_section("Step 2: Create Tmux Slots", |
|
"Now we'll create 3 tmux sessions that will act as our worker slots.") |
|
|
|
await run_demo_command("uv run ./claude_dispatcher.py setup --slots 3", "Creating 3 slots") |
|
|
|
input("\nPress Enter to continue...") |
|
|
|
# Check status |
|
print_section("Step 3: Check Status", |
|
"Let's see the status of our slots - they should all be idle.") |
|
|
|
await run_demo_command("uv run ./claude_dispatcher.py status --slots 3", "Checking slot status") |
|
|
|
input("\nPress Enter to continue...") |
|
|
|
# Run simple jobs |
|
print_section("Step 4: Run Simple Jobs", |
|
"Now we'll dispatch 3 simple jobs - one for each slot.") |
|
|
|
jobs = [ |
|
'"What is 2 + 2?"', |
|
'"List 3 colors"', |
|
'"Say hello"' |
|
] |
|
|
|
await run_demo_command( |
|
f"uv run ./claude_dispatcher.py run --slots 3 --timeout 30 {' '.join(jobs)}", |
|
"Dispatching 3 simple jobs" |
|
) |
|
|
|
console.print("\n[green]✓ Basic demo completed![/green]") |
|
console.print("\nYou can check the tmux sessions with: [cyan]tmux ls[/cyan]") |
|
console.print("Attach to a session with: [cyan]tmux attach -t claude_slot_1[/cyan]") |
|
|
|
asyncio.run(run()) |
|
|
|
def demo_queueing(): |
|
"""Concurrent Demo: More Jobs than Slots""" |
|
async def run(): |
|
console.print(Panel( |
|
"This demo shows what happens when we have more jobs than available slots.\n" |
|
"We'll create 3 slots but dispatch 6 jobs. The first 3 will start immediately,\n" |
|
"and the remaining 3 will wait for slots to become available.", |
|
title="[bold cyan]Overview[/bold cyan]", |
|
border_style="cyan" |
|
)) |
|
|
|
input("\nPress Enter to start...") |
|
|
|
# Setup |
|
await run_demo_command("uv run ./claude_dispatcher.py setup --slots 3", "Setting up 3 slots") |
|
|
|
console.print("\n[yellow]Tip: Open another terminal and run:[/yellow]") |
|
console.print("[cyan]watch -n 2 'uv run ./claude_dispatcher.py status --slots 3'[/cyan]") |
|
console.print("[dim]to see live status updates![/dim]") |
|
|
|
input("\nPress Enter to dispatch 6 jobs...") |
|
|
|
# Dispatch 6 jobs with mixed durations |
|
jobs = [ |
|
'"Count to 5 slowly"', |
|
'"What is the meaning of life?"', |
|
'"Write a haiku about coding"', |
|
'"Explain quantum computing in one sentence"', |
|
'"List prime numbers under 20"', |
|
'"Tell a programming joke"' |
|
] |
|
|
|
start_time = time.time() |
|
await run_demo_command( |
|
f"uv run ./claude_dispatcher.py run --slots 3 --timeout 60 {' '.join(jobs)}", |
|
"Dispatching 6 jobs to 3 slots" |
|
) |
|
elapsed = time.time() - start_time |
|
|
|
console.print(f"\n[green]✓ All jobs dispatched in {elapsed:.1f} seconds[/green]") |
|
console.print("\nNotice how jobs 4-6 had to wait for slots to become available!") |
|
|
|
asyncio.run(run()) |
|
|
|
def demo_recovery(): |
|
"""Recovery Demo: Cleanup and Recovery""" |
|
async def run(): |
|
console.print(Panel( |
|
"This demo shows the cleanup and recovery features:\n" |
|
"- Force cleanup of stuck slots\n" |
|
"- Recovery from interrupted jobs\n" |
|
"- Self-healing mechanisms", |
|
title="[bold cyan]Overview[/bold cyan]", |
|
border_style="cyan" |
|
)) |
|
|
|
input("\nPress Enter to start...") |
|
|
|
# Setup |
|
await run_demo_command("uv run ./claude_dispatcher.py setup --slots 2", "Setting up 2 slots") |
|
|
|
# Simulate stuck job |
|
console.print("\n[yellow]Simulating a stuck slot...[/yellow]") |
|
proc = await asyncio.create_subprocess_shell( |
|
"redis-cli SET claude_slot:1 'busy:job_999'", |
|
stdout=asyncio.subprocess.DEVNULL |
|
) |
|
await proc.wait() |
|
|
|
# Show stuck status |
|
await run_demo_command("uv run ./claude_dispatcher.py status --slots 2", "Status shows slot 1 is stuck") |
|
|
|
input("\nPress Enter to cleanup...") |
|
|
|
# Cleanup |
|
await run_demo_command("uv run ./claude_dispatcher.py cleanup --slots 2 --force", "Force cleanup all slots") |
|
|
|
# Verify cleanup |
|
await run_demo_command("uv run ./claude_dispatcher.py status --slots 2", "All slots are now idle") |
|
|
|
console.print("\n[green]✓ Cleanup demo completed![/green]") |
|
|
|
asyncio.run(run()) |
|
|
|
def demo_advanced(): |
|
"""Advanced Demo: Custom Configuration""" |
|
async def run(): |
|
console.print(Panel( |
|
"This demo shows advanced features:\n" |
|
"- Custom timeouts\n" |
|
"- Different Claude binary paths\n" |
|
"- Real-time monitoring\n" |
|
"- Performance testing", |
|
title="[bold cyan]Overview[/bold cyan]", |
|
border_style="cyan" |
|
)) |
|
|
|
# Get user input |
|
slots = Prompt.ask("How many slots would you like?", default="4") |
|
timeout = Prompt.ask("Timeout per job (seconds)?", default="120") |
|
|
|
# Setup |
|
await run_demo_command(f"uv run ./claude_dispatcher.py setup --slots {slots}", f"Setting up {slots} slots") |
|
|
|
# Create performance test jobs |
|
console.print("\n[yellow]Creating performance test jobs...[/yellow]") |
|
job_count = int(slots) * 2 |
|
jobs = [] |
|
for i in range(job_count): |
|
complexity = ["simple", "moderate", "complex"][i % 3] |
|
jobs.append(f'"Job {i+1}: Simulate {complexity} task"') |
|
|
|
console.print(f"Created {job_count} jobs for {slots} slots") |
|
|
|
# Dispatch |
|
start_time = time.time() |
|
await run_demo_command( |
|
f"uv run ./claude_dispatcher.py run --slots {slots} --timeout {timeout} {' '.join(jobs)}", |
|
f"Dispatching {job_count} jobs" |
|
) |
|
elapsed = time.time() - start_time |
|
|
|
console.print(f"\n[green]Performance: {job_count} jobs dispatched in {elapsed:.1f}s[/green]") |
|
console.print(f"Average: {elapsed/job_count:.2f}s per job dispatch") |
|
|
|
asyncio.run(run()) |
|
|
|
|
|
# ============================================ |
|
# USAGE FUNCTIONS (Following template pattern) |
|
# ============================================ |
|
|
|
async def working_usage(): |
|
""" |
|
Known working examples that demonstrate dispatcher functionality. |
|
This function contains stable, tested code that reliably works. |
|
|
|
CRITICAL FOR AGENTS: |
|
- This function MUST verify that the dispatcher produces expected results |
|
- Use assertions to validate outputs match expectations |
|
- Return True only if ALL tests pass |
|
- This is how agents verify the dispatcher actually works |
|
""" |
|
logger.info("=== Running Working Usage Examples ===") |
|
|
|
try: |
|
# Test 1: Validate prerequisites |
|
logger.info("Test 1: Validating prerequisites") |
|
|
|
# Check Python version |
|
assert sys.version_info >= (3, 9), "Python 3.9+ required" |
|
|
|
# Check Redis connection |
|
test_redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) |
|
test_redis.ping() |
|
test_key = "dispatcher_test" |
|
test_redis.set(test_key, "working") |
|
value = test_redis.get(test_key) |
|
test_redis.delete(test_key) |
|
assert value == "working", "Redis test failed" |
|
logger.success("✓ Prerequisites validated") |
|
|
|
# Test 2: Create and verify tmux sessions |
|
logger.info("Test 2: Testing tmux session management") |
|
test_slots = 2 |
|
|
|
# Create test sessions |
|
for n in range(1, test_slots + 1): |
|
session = f"test_{PREFIX}_{n}" |
|
# Kill if exists |
|
subprocess.run(["tmux", "kill-session", "-t", session], |
|
capture_output=True) |
|
# Create new |
|
result = subprocess.run(["tmux", "new-session", "-d", "-s", session], |
|
capture_output=True) |
|
assert result.returncode == 0, f"Failed to create session {session}" |
|
|
|
# Verify sessions exist |
|
for n in range(1, test_slots + 1): |
|
session = f"test_{PREFIX}_{n}" |
|
result = subprocess.run(["tmux", "has-session", "-t", session], |
|
capture_output=True) |
|
assert result.returncode == 0, f"Session {session} not found" |
|
|
|
# Cleanup test sessions |
|
for n in range(1, test_slots + 1): |
|
session = f"test_{PREFIX}_{n}" |
|
subprocess.run(["tmux", "kill-session", "-t", session], |
|
capture_output=True) |
|
|
|
logger.success("✓ Tmux session management working") |
|
|
|
# Test 3: Verify atomic slot claiming |
|
logger.info("Test 3: Testing atomic slot claiming") |
|
|
|
slot_key = f"{PREFIX}:test:1" |
|
|
|
# Ensure clean state |
|
test_redis.delete(slot_key) |
|
|
|
# First claim should succeed |
|
result1 = test_redis.set(slot_key, "busy:job_1", nx=True) |
|
assert result1 is True, "First claim should succeed" |
|
|
|
# Second claim should fail |
|
result2 = test_redis.set(slot_key, "busy:job_2", nx=True) |
|
assert result2 is None or result2 is False, "Second claim should fail" |
|
|
|
# Cleanup |
|
test_redis.delete(slot_key) |
|
|
|
# After cleanup, claim should succeed |
|
result3 = test_redis.set(slot_key, "busy:job_3", nx=True) |
|
assert result3 is True, "Claim after cleanup should succeed" |
|
test_redis.delete(slot_key) |
|
|
|
logger.success("✓ Atomic slot claiming verified") |
|
|
|
# Test 4: Verify status reporting |
|
logger.info("Test 4: Testing status reporting") |
|
|
|
# Create test job info |
|
job_id = 9999 |
|
job_info = { |
|
"prompt": "Test prompt", |
|
"slot": 1, |
|
"start_time": datetime.now().isoformat(), |
|
"status": "running" |
|
} |
|
job_key = f"{PREFIX}:job:{job_id}" |
|
test_redis.hset(job_key, mapping=job_info) |
|
|
|
# Verify retrieval |
|
stored_info = test_redis.hgetall(job_key) |
|
assert stored_info["prompt"] == "Test prompt", "Job info storage failed" |
|
assert stored_info["slot"] == "1", "Slot number mismatch" |
|
assert stored_info["status"] == "running", "Status mismatch" |
|
|
|
# Cleanup |
|
test_redis.delete(job_key) |
|
|
|
logger.success("✓ Status reporting verified") |
|
|
|
logger.success("=== All Working Usage Tests Passed! ===") |
|
return True |
|
|
|
except AssertionError as e: |
|
logger.error(f"Test assertion failed: {e}") |
|
return False |
|
except Exception as e: |
|
logger.error(f"Test failed with error: {e}") |
|
logger.exception("Full traceback:") |
|
return False |
|
|
|
|
|
async def debug_function(): |
|
""" |
|
Debug function for testing new features or debugging issues. |
|
Update this frequently while developing/debugging. |
|
""" |
|
logger.info("=== Running Debug Function ===") |
|
|
|
# Test specific dispatcher functionality |
|
logger.debug("Testing dispatcher components...") |
|
|
|
# Example: Test concurrent job dispatch simulation |
|
async def simulate_job(job_id: int, delay: float): |
|
logger.debug(f"Job {job_id} starting (delay: {delay}s)") |
|
await asyncio.sleep(delay) |
|
logger.debug(f"Job {job_id} completed") |
|
return job_id |
|
|
|
# Run multiple jobs concurrently |
|
jobs = [ |
|
simulate_job(1, 0.5), |
|
simulate_job(2, 0.3), |
|
simulate_job(3, 0.7), |
|
] |
|
|
|
results = await asyncio.gather(*jobs) |
|
logger.debug(f"All jobs completed: {results}") |
|
|
|
# Test Redis connection pooling |
|
if redis_client: |
|
for i in range(5): |
|
key = f"debug_test_{i}" |
|
redis_client.setex(key, 10, f"value_{i}") |
|
value = redis_client.get(key) |
|
logger.debug(f"Redis test {i}: {value}") |
|
redis_client.delete(key) |
|
|
|
return True |
|
|
|
|
|
async def stress_test(): |
|
""" |
|
Run stress tests for the dispatcher system. |
|
Tests concurrent job handling, timeouts, and recovery. |
|
""" |
|
logger.info("=== Running Stress Tests ===") |
|
|
|
# Stress test configuration |
|
stress_config = { |
|
"name": "dispatcher_stress_test", |
|
"description": "Test dispatcher under load", |
|
"tests": [ |
|
{ |
|
"name": "concurrent_claims", |
|
"description": "Test many concurrent slot claims", |
|
"concurrent_jobs": 20, |
|
"slots": 5, |
|
"iterations": 3 |
|
}, |
|
{ |
|
"name": "timeout_handling", |
|
"description": "Test timeout and cleanup", |
|
"jobs_with_timeouts": 10, |
|
"timeout_seconds": 2 |
|
} |
|
] |
|
} |
|
|
|
logger.info(f"Running stress test: {stress_config['name']}") |
|
|
|
# Test 1: Concurrent claims |
|
logger.info("Test 1: Concurrent slot claiming") |
|
|
|
async def try_claim_slot(job_id: int, slot_num: int) -> bool: |
|
"""Simulate claiming a slot.""" |
|
key = f"{PREFIX}:stress:{slot_num}" |
|
result = redis_client.set(key, f"job_{job_id}", nx=True, ex=5) |
|
if result: |
|
await asyncio.sleep(0.1) # Simulate work |
|
redis_client.delete(key) |
|
return result |
|
|
|
success_count = 0 |
|
tasks = [] |
|
for i in range(20): |
|
slot = (i % 5) + 1 |
|
tasks.append(try_claim_slot(i, slot)) |
|
|
|
results = await asyncio.gather(*tasks) |
|
success_count = sum(1 for r in results if r) |
|
|
|
logger.info(f"Concurrent claims: {success_count}/20 succeeded") |
|
assert success_count >= 5, "At least 5 claims should succeed" |
|
|
|
logger.success("=== Stress Tests Completed ===") |
|
return True |
|
|
|
|
|
if __name__ == "__main__": |
|
""" |
|
Script entry point with triple-mode execution. |
|
|
|
Usage: |
|
python claude_dispatcher.py # Runs working_usage() |
|
python claude_dispatcher.py debug # Runs debug_function() |
|
python claude_dispatcher.py stress # Runs stress_test() |
|
python claude_dispatcher.py <command> # Runs typer CLI command |
|
|
|
This pattern provides: |
|
1. Stable working examples that always run |
|
2. Debug playground for testing without breaking working code |
|
3. Stress testing for load scenarios |
|
4. Full CLI functionality via typer |
|
""" |
|
|
|
# Check if running in test mode |
|
if len(sys.argv) == 1: |
|
# No arguments - run working usage |
|
logger.info("Running in WORKING mode...") |
|
success = asyncio.run(working_usage()) |
|
exit(0 if success else 1) |
|
elif len(sys.argv) == 2: |
|
arg = sys.argv[1] |
|
if arg == "debug": |
|
logger.info("Running in DEBUG mode...") |
|
success = asyncio.run(debug_function()) |
|
exit(0 if success else 1) |
|
elif arg == "stress": |
|
logger.info("Running in STRESS TEST mode...") |
|
success = asyncio.run(stress_test()) |
|
exit(0 if success else 1) |
|
|
|
# Otherwise, run the typer CLI |
|
app() |