Skip to content

Instantly share code, notes, and snippets.

@grahama1970
Last active July 26, 2025 17:06
Show Gist options
  • Select an option

  • Save grahama1970/d5101878c3fe9f6ca407207fb1134d2a to your computer and use it in GitHub Desktop.

Select an option

Save grahama1970/d5101878c3fe9f6ca407207fb1134d2a to your computer and use it in GitHub Desktop.
Claude Tmux Dispatcher (proof of concept): The Claude Dashboard project is a real-time monitoring tool for managing Claude AI code instances via tmux slots and hook events. Built with React, Tailwind CSS, Shadcn/UI, and FastAPI, it provides live views of tmux outputs, event filtering, chat transcripts, and system health. It integrates tightly wi…

Claude Dashboard

A real-time monitoring dashboard for Claude code instances managed by claude_dispatcher.py, built with React, Tailwind CSS, Shadcn/UI, and FastAPI. It provides live observability of tmux slots, Claude hook events, and system health, inspired by the disler/claude-code-hooks-multi-agent-observability repository. The dashboard runs locally, leveraging uv for dependency management and WebSockets for real-time updates.

Key Features

  • Real-Time Monitoring: Displays live tmux slot statuses, Claude hook events (e.g., PreToolUse, Notification), and system health (CPU, memory, Redis) via WebSockets.
  • Live Tmux Output Streaming: Streams tmux capture-pane output for each slot in real-time, viewable in a modal.
  • Advanced Event Filtering: Filters hook events by event type, session ID, source app, and content on the Event Timeline page.
  • Chat Transcript Modal: Views Claude conversation history for Notification events, with formatted prompt/response pairs.
  • Concurrent Job Management: Integrates with claude_dispatcher.py to dispatch and monitor jobs in tmux slots.
  • Self-Contained Backend: Uses uv run --script with embedded dependencies in main.py for easy setup.
  • Responsive UI: Built with React, Tailwind CSS, and Shadcn/UI for a modern, user-friendly interface.

System Requirements

Before setting up, ensure the following are installed and available in your PATH:

  1. Node.js (v18+): For running the React frontend.
  2. Python 3.9+: For the FastAPI backend and Claude hooks.
  3. Redis: For state management (redis-server and redis-cli).
  4. uv: High-performance Python package manager (Installation).
  5. tmux: For managing Claude slots.
  6. Claude Code CLI: Anthropic’s CLI for Claude, with ANTHROPIC_API_KEY set in your environment.

Project Setup

Follow these steps to set up and run the Claude Dashboard locally.

1. Install System Dependencies

  • Node.js:
    sudo apt install nodejs  # Ubuntu
    
    

Python 3.9+:sudo apt install python3 # Ubuntu

Redis:sudo apt install redis-server # Ubuntu redis-server # Start Redis

uv:curl -LsSf https://astral.sh/uv/install.sh | sh

tmux:sudo apt install tmux # Ubuntu

Claude Code CLI: Install per Anthropic’s instructions. Set the API key:export ANTHROPIC_API_KEY="your-key-here"

  1. Clone or Create Project Structure Create the project directory and ensure the following structure: claude-dashboard/ ├── .claude/ │ ├── hooks/ │ │ └── send_event.py │ └── settings.json ├── backend/ │ └── main.py ├── frontend/ │ ├── src/ │ │ ├── components/ui/ # Shadcn/UI components │ │ ├── index.tsx │ │ └── index.css │ ├── package.json │ └── vite.config.ts ├── claude_dispatcher.py └── README.md

Copy claude_dispatcher.py from proof_of_concept/claude_dispatcher/. Use the provided send_event.py, settings.json, main.py, index.tsx, index.css, vite.config.ts, and package.json from the implementation.

  1. Setup Frontend Install dependencies and start the React development server: cd frontend npm install @shadcn/ui@latest # Install Shadcn/UI components (Button, Card, Table, Input, Textarea, Dialog, Select, Alert) npm install npm run dev

Access the dashboard at http://localhost:5173.

  1. Setup Backend Run the FastAPI backend, which uses uv to manage dependencies: cd backend uv run main.py

The server runs on http://localhost:4000, handling API and WebSocket endpoints.

  1. Configure Claude Hooks

Ensure .claude/hooks/send_event.py and .claude/settings.json are in place (as provided). Convert relative paths to absolute paths in settings.json:claude /convert_paths_absolute

  1. Update claude_dispatcher.py Ensure claude_dispatcher.py sets a session_id for each job to correlate with Claude hook events: import uuid def _launch_job(slot, prompt): session_id = str(uuid.uuid4()) redis_client.hset(f"claude_slot:{slot}", "session_id", session_id)

    Existing job launch logic

  2. Initialize Tmux Slots Set up tmux slots for Claude jobs: uv run claude_dispatcher.py setup --slots 5

  3. Test the Dashboard

Start Redis: redis-server. Run the backend: cd backend && uv run main.py. Run the frontend: cd frontend && npm run dev. Open http://localhost:5173: Actions: Run a job (e.g., prompt “Write a Python script”). Event Timeline: Filter events by type (e.g., PreToolUse), session ID, source app, or content. Click “View Chat” for Notification events to see chat history. Slots: Check slot statuses and click “View Live Output” to stream tmux output. Overview: View system health and recent event activity (bar chart).

Usage Running Jobs Dispatch Claude jobs via the Actions page or CLI: uv run claude_dispatcher.py run "Write a Python script" "Summarize a news article"

Monitoring

Event Timeline: View and filter Claude hook events (e.g., PreToolUse, Notification) in real-time. Slots: Monitor slot statuses and stream live tmux output for running jobs. Overview: Check system health (Redis, CPU, memory) and recent activity (bar chart).

Cleanup Clear stuck or orphaned slots: uv run claude_dispatcher.py cleanup

Troubleshooting

Tmux Output Not Streaming: Ensure tmux is installed and slots are named claude_slot_N (e.g., claude_slot_1). Verify the backend is running. Hook Events Missing: Check .claude/settings.json for absolute paths and ensure ANTHROPIC_API_KEY is set. Redis Connection Issues: Confirm redis-server is running and accessible at localhost:6379. Frontend Errors: Verify all dependencies are installed (npm install) and Shadcn/UI components are set up.

Files in this Project

claude_dispatcher.py: Dispatcher script for managing Claude jobs in tmux slots. .claude/hooks/send_event.py: Claude hook script for sending events to the backend. .claude/settings.json: Claude hook configuration. backend/main.py: FastAPI backend with WebSocket endpoints for events and tmux streaming. frontend/src/index.tsx: React dashboard with UI components. frontend/src/index.css: Tailwind CSS styles. frontend/vite.config.ts: Vite configuration for local development. frontend/package.json: Frontend dependencies. README.md: This documentation.

License This project is licensed under the MIT License.

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "typer[all]",
# "rich",
# "redis",
# "loguru",
# "python-dotenv",
# "psutil",
# ]
# ///
"""
Claude Dispatcher - Concurrent task dispatcher for Claude using tmux and Redis.
This script demonstrates a robust concurrent task dispatcher that manages multiple
Claude processes in isolated tmux sessions, using Redis for atomic slot management
and self-healing job execution.
Key features:
- Atomic slot claiming using Redis SETNX
- Self-healing jobs that clean up after themselves
- Real-time monitoring of job status
- Graceful handling of failures and timeouts
- Force cleanup for stuck slots
AGENT VERIFICATION INSTRUCTIONS:
- Run this script with no arguments to execute working_usage()
- The working_usage() function MUST complete successfully
- This verifies the dispatcher can manage concurrent Claude tasks
- DO NOT assume the script works without running it
- Check that all assertions pass in working_usage()
Third-party Documentation:
- Redis Commands: https://redis.io/commands/
- tmux Manual: https://man7.org/linux/man-pages/man1/tmux.1.html
- Claude CLI: https://docs.anthropic.com/en/docs/claude-cli
Example Usage:
# Basic setup and run
uv run ./claude_dispatcher.py # Run working examples
uv run ./claude_dispatcher.py validate # Check prerequisites
uv run ./claude_dispatcher.py setup # Create tmux slots
uv run ./claude_dispatcher.py run # Dispatch jobs
uv run ./claude_dispatcher.py status # Monitor status
Expected Results:
- validate: All 6 prerequisite tests pass
- setup: Creates 5 tmux sessions (claude_slot_1 through claude_slot_5)
- run: Dispatches 6 jobs, with first 5 running immediately
- status: Shows slot status with job information
"""
from __future__ import annotations
import asyncio
import os
import shlex
import signal
import subprocess
import sys
import time
import psutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Dict, Any, Tuple
# Third-party imports
import typer
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.prompt import Prompt, Confirm
import redis
from loguru import logger
from dotenv import load_dotenv, find_dotenv
# Configure logging
logger.remove() # Remove default handler
logger.add(
sys.stderr,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
)
# Load environment variables
load_dotenv(find_dotenv())
console = Console()
# --- Configuration ---
PREFIX = "claude_slot"
DEFAULT_SLOTS = 5
DEFAULT_TIMEOUT = 600
DEFAULT_CMD = "claude"
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
# Resource limits
MAX_MEMORY_PERCENT = float(os.getenv("MAX_MEMORY_PERCENT", "80"))
MAX_CPU_PERCENT = float(os.getenv("MAX_CPU_PERCENT", "80"))
MIN_FREE_MEMORY_MB = int(os.getenv("MIN_FREE_MEMORY_MB", "500"))
# Redis key TTLs (in seconds)
SLOT_KEY_TTL = int(os.getenv("SLOT_KEY_TTL", "3600")) # 1 hour
JOB_INFO_TTL = int(os.getenv("JOB_INFO_TTL", "7200")) # 2 hours
# Health check intervals
HEALTH_CHECK_INTERVAL = int(os.getenv("HEALTH_CHECK_INTERVAL", "60")) # 1 minute
STUCK_SLOT_THRESHOLD = int(os.getenv("STUCK_SLOT_THRESHOLD", "1800")) # 30 minutes
app = typer.Typer(
no_args_is_help=True,
help="A concurrent task dispatcher for Claude using tmux and Redis.",
)
# --- Global Client with Connection Retry ---
class RedisClient:
"""Redis client with automatic retry."""
def __init__(self):
self._client = None
self._connect()
def _connect(self):
"""Create Redis connection with retry logic."""
max_retries = 3
backoff = 0.5
for attempt in range(max_retries):
try:
self._client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
)
self._client.ping()
logger.success(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
return
except redis.RedisError as e:
if attempt == max_retries - 1:
console.print(f"[bold red]✗ Fatal:[/bold red] Could not connect to Redis after {max_retries} attempts. {e}")
raise typer.Exit(code=1)
logger.warning(f"Redis connection attempt {attempt + 1} failed, retrying in {backoff}s...")
time.sleep(backoff)
backoff *= 2
def execute_with_retry(self, func, *args, **kwargs):
"""Execute Redis command with automatic retry on connection failure."""
max_retries = 3
backoff = 0.1
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (redis.ConnectionError, redis.TimeoutError) as e:
if attempt == max_retries - 1:
logger.error(f"Redis operation failed after {max_retries} attempts: {e}")
raise
logger.warning(f"Redis operation failed, retrying in {backoff}s (attempt {attempt + 1}/{max_retries})")
time.sleep(backoff)
backoff *= 2
# Try to reconnect
try:
self._connect()
except:
pass
def __getattr__(self, name):
"""Proxy all Redis commands through retry logic."""
client_attr = getattr(self._client, name)
if callable(client_attr):
return lambda *args, **kwargs: self.execute_with_retry(client_attr, *args, **kwargs)
return client_attr
# Initialize Redis client
try:
redis_client = RedisClient()
except Exception as e:
console.print(f"[bold red]✗ Fatal:[/bold red] Could not initialize Redis client. {e}")
sys.exit(1)
# --- Interrupt Handler ---
_shutdown_event = asyncio.Event()
def signal_handler(sig, frame):
"""Handle interrupt signals gracefully."""
logger.warning(f"Received signal {sig}, initiating graceful shutdown...")
_shutdown_event.set()
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# ------------------------------------------------------------------ #
# Core Helpers
# ------------------------------------------------------------------ #
def _slot_key(n: int) -> str:
"""Returns the Redis key for a given slot."""
return f"{PREFIX}:{n}"
def _session_name(n: int) -> str:
"""Returns the tmux session name for a given slot."""
return f"{PREFIX}_{n}"
def _pane(n: int) -> str:
"""Returns the tmux pane identifier for a given slot."""
return _session_name(n)
def _job_info_key(job_id: int) -> str:
"""Returns the Redis key for job information."""
return f"{PREFIX}:job:{job_id}"
async def _store_job_info(job_id: int, prompt: str, slot: int) -> None:
"""Store job information in Redis with TTL."""
info = {
"prompt": prompt[:100] + "..." if len(prompt) > 100 else prompt,
"slot": slot,
"start_time": datetime.now().isoformat(),
"status": "running"
}
redis_client.hset(_job_info_key(job_id), mapping=info)
redis_client.expire(_job_info_key(job_id), JOB_INFO_TTL)
async def _get_slot_status(n: int) -> Dict[str, Any]:
"""Get the current status of a slot with health info."""
slot_key = _slot_key(n)
value = redis_client.get(slot_key)
status = {"slot": n, "status": "idle", "job_id": None, "job_info": None, "health": "unknown"}
if value:
status["status"] = "busy"
if value.startswith("busy:job_"):
job_id = int(value.split("_")[1])
status["job_id"] = job_id
job_info = redis_client.hgetall(_job_info_key(job_id))
if job_info:
status["job_info"] = job_info
# Check if slot is stuck
start_time = datetime.fromisoformat(job_info.get("start_time", datetime.now().isoformat()))
duration = (datetime.now() - start_time).total_seconds()
if duration > STUCK_SLOT_THRESHOLD:
status["health"] = "stuck"
else:
status["health"] = "healthy"
# Check if tmux session is actually running
try:
proc = await asyncio.create_subprocess_exec(
"tmux", "has-session", "-t", _session_name(n),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
tmux_exists = await proc.wait() == 0
if not tmux_exists and status["status"] == "busy":
status["health"] = "orphaned"
# Get last output if session exists
if tmux_exists:
proc = await asyncio.create_subprocess_exec(
"tmux", "capture-pane", "-t", _pane(n), "-p",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL
)
stdout, _ = await proc.communicate()
if stdout:
last_line = stdout.decode().strip().split('\n')[-1] if stdout else ""
status["last_output"] = last_line[:80] + "..." if len(last_line) > 80 else last_line
except:
status["last_output"] = "N/A"
return status
def check_system_resources() -> Tuple[bool, str]:
"""Check if system has sufficient resources."""
try:
# Check memory
memory = psutil.virtual_memory()
memory_percent = memory.percent
free_memory_mb = memory.available / (1024 * 1024)
if memory_percent > MAX_MEMORY_PERCENT:
return False, f"Memory usage too high: {memory_percent:.1f}% (limit: {MAX_MEMORY_PERCENT}%)"
if free_memory_mb < MIN_FREE_MEMORY_MB:
return False, f"Insufficient free memory: {free_memory_mb:.0f}MB (minimum: {MIN_FREE_MEMORY_MB}MB)"
# Check CPU
cpu_percent = psutil.cpu_percent(interval=0.1)
if cpu_percent > MAX_CPU_PERCENT:
return False, f"CPU usage too high: {cpu_percent:.1f}% (limit: {MAX_CPU_PERCENT}%)"
return True, f"Resources OK - Memory: {memory_percent:.1f}%, CPU: {cpu_percent:.1f}%"
except Exception as e:
logger.error(f"Error checking system resources: {e}")
return True, "Resource check failed, proceeding anyway"
def cleanup_zombie_processes():
"""Clean up any zombie processes."""
try:
zombies = []
for proc in psutil.process_iter(['pid', 'name', 'status']):
if proc.info['status'] == psutil.STATUS_ZOMBIE:
zombies.append(proc.info['pid'])
if zombies:
logger.warning(f"Found {len(zombies)} zombie processes, attempting cleanup")
for pid in zombies:
try:
os.waitpid(pid, os.WNOHANG)
except:
pass
except Exception as e:
logger.error(f"Error cleaning up zombie processes: {e}")
async def _try_claim_and_launch(
n: int, job_id: int, prompt: str, timeout: int, claude_bin: str
) -> bool:
"""Atomically tries to claim a slot in Redis and launch a self-cleaning job."""
# Check resources before claiming
resources_ok, resource_msg = check_system_resources()
if not resources_ok:
logger.warning(f"Resource check failed for job {job_id}: {resource_msg}")
return False
slot_key = _slot_key(n)
# Use SET with NX and EX for atomic claim with TTL
if not redis_client.set(slot_key, f"busy:job_{job_id}", nx=True, ex=SLOT_KEY_TTL):
return False # Slot was already claimed.
try:
logger.info(f"Job #{job_id} claiming slot {n}")
await _store_job_info(job_id, prompt, n)
# FIXED: Use trap to ensure cleanup happens even on interrupts
claude_cmd = f"{claude_bin} -p {shlex.quote(prompt)} --output-format stream-json"
cleanup_cmd = f"redis-cli -h {REDIS_HOST} -p {REDIS_PORT} DEL {slot_key} {_job_info_key(job_id)}"
# Use trap to ensure cleanup runs on EXIT, INT, TERM
# Don't use exec to avoid killing the tmux pane
full_cmd = f"""bash -c 'trap "{cleanup_cmd}" EXIT INT TERM; timeout {timeout} {claude_cmd}; exit_code=$?; {cleanup_cmd}; exit $exit_code'"""
proc = await asyncio.create_subprocess_exec(
"tmux", "send-keys", "-t", _pane(n), full_cmd, "Enter",
stderr=asyncio.subprocess.PIPE,
)
# We don't need to read output, just ensure the send-keys command succeeded.
await proc.wait()
if proc.returncode != 0:
stderr = (await proc.stderr.read()).decode().strip()
console.print(f"[red]✗ Launch Error ({_pane(n)}):[/red] {stderr}")
logger.error(f"tmux send-keys failed for slot {n}: {stderr}")
raise RuntimeError(f"tmux send-keys failed: {stderr}")
console.print(f"[green]✓[/green] Job #{job_id} launched into slot {n}.")
logger.success(f"Job #{job_id} successfully launched in slot {n}")
return True
except Exception as e:
console.print(f"[bold red]✗ Critical launch failure in slot {n}:[/bold red] {e}")
logger.error(f"Critical failure launching job {job_id} in slot {n}: {e}")
redis_client.delete(slot_key) # Clean up the claim on failure.
redis_client.delete(_job_info_key(job_id))
return False
async def _dispatch_one_job(
job_id: int, prompt: str, timeout: int, claude_bin: str, max_slots: int
) -> None:
"""A worker that polls for an idle slot and launches a single job."""
console.print(f"[dim]Job #{job_id} ('{prompt[:30]}...') waiting for slot...[/dim]")
logger.info(f"Job #{job_id} starting dispatch")
backoff = 0.25
attempts = 0
while not _shutdown_event.is_set():
attempts += 1
# Clean up zombies periodically
if attempts % 20 == 0:
cleanup_zombie_processes()
for slot_num in range(1, max_slots + 1):
if await _try_claim_and_launch(slot_num, job_id, prompt, timeout, claude_bin):
return
# Check for shutdown before sleeping
if _shutdown_event.is_set():
logger.info(f"Job #{job_id} cancelled due to shutdown")
return
try:
await asyncio.wait_for(
_shutdown_event.wait(),
timeout=backoff
)
# If we reach here, shutdown was requested
logger.info(f"Job #{job_id} cancelled due to shutdown")
return
except asyncio.TimeoutError:
# Normal timeout, continue polling
pass
backoff = min(backoff * 1.5, 5.0) # Exponential backoff up to 5s.
if attempts % 10 == 0:
logger.debug(f"Job #{job_id} still waiting after {attempts} attempts")
# ------------------------------------------------------------------ #
# Validation Helpers
# ------------------------------------------------------------------ #
async def _validate_dependencies(max_slots: int, claude_bin: str):
"""Runs all pre-flight checks concurrently."""
console.print("[dim]Running pre-flight checks...[/dim]")
logger.info("Starting dependency validation")
async def _check_tmux_session(n: int) -> bool:
proc = await asyncio.create_subprocess_exec("tmux", "has-session", "-t", _session_name(n))
return await proc.wait() == 0
async def _check_claude_bin() -> bool:
try:
# First check if the binary exists and is executable
proc = await asyncio.create_subprocess_exec(
"which", claude_bin,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
if proc.returncode != 0:
return False
# Then try to run it with --version
proc = await asyncio.create_subprocess_exec(
claude_bin, "--version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.wait()
return proc.returncode == 0
except FileNotFoundError:
return False
claude_ok_task = asyncio.create_task(_check_claude_bin())
tmux_ok_tasks = [_check_tmux_session(n) for n in range(1, max_slots + 1)]
results = await asyncio.gather(claude_ok_task, *tmux_ok_tasks)
claude_ok, *tmux_sessions_ok = results
if not claude_ok:
console.print(f"[red]✗ Prerequisite failed:[/red] `{claude_bin}` not found or not executable.")
console.print("[yellow]Hint:[/yellow] Make sure 'claude' is installed and in your PATH, or use --claude to specify the path")
raise typer.Exit(code=1)
missing_sessions = []
for n, exists in enumerate(tmux_sessions_ok, 1):
if not exists:
missing_sessions.append(n)
if missing_sessions:
console.print(f"[red]✗ Missing tmux sessions:[/red] {', '.join(f'{PREFIX}_{n}' for n in missing_sessions)}")
console.print("[yellow]Hint:[/yellow] Run `uv run ./claude_dispatcher.py setup` first")
raise typer.Exit(code=1)
console.print("[green]✓[/green] All checks passed.")
logger.success("All dependencies validated successfully")
# ------------------------------------------------------------------ #
# Health Command
# ------------------------------------------------------------------ #
@app.command()
def health(
slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of slots to check."),
):
"""Run health checks and show system status."""
async def _run_health_check():
console.print("\n[bold cyan]System Health Check[/bold cyan]\n")
# Check Redis
try:
redis_client.ping()
console.print("Redis: ✅ Connected")
except Exception as e:
console.print(f"Redis: ❌ Error - {e}")
# Check resources
resources_ok, resource_msg = check_system_resources()
status_emoji = "✅" if resources_ok else "❌"
console.print(f"Resources: {status_emoji} {resource_msg}")
# Check tmux sessions
healthy_sessions = 0
for n in range(1, slots + 1):
proc = await asyncio.create_subprocess_exec(
"tmux", "has-session", "-t", _session_name(n),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
if await proc.wait() == 0:
healthy_sessions += 1
console.print(f"Tmux Sessions: {healthy_sessions}/{slots} active")
# Check slots
table = Table(title="Slot Health Status")
table.add_column("Slot", style="cyan")
table.add_column("Status", style="green")
table.add_column("Health", style="yellow")
table.add_column("Job ID", style="blue")
table.add_column("Duration", style="magenta")
stuck_count = 0
orphaned_count = 0
for n in range(1, slots + 1):
status = await _get_slot_status(n)
if status["health"] == "stuck":
stuck_count += 1
elif status["health"] == "orphaned":
orphaned_count += 1
health_emoji = {
"healthy": "✅",
"stuck": "🔴",
"orphaned": "⚠️",
"unknown": "❓"
}.get(status["health"], "❓")
duration = "-"
if status.get("job_info"):
start_time = datetime.fromisoformat(status["job_info"].get("start_time", datetime.now().isoformat()))
duration = f"{(datetime.now() - start_time).total_seconds():.0f}s"
table.add_row(
str(n),
status["status"],
f"{health_emoji} {status['health']}",
str(status.get("job_id", "-")),
duration
)
console.print(table)
if stuck_count > 0:
console.print(f"\n⚠️ [yellow]Warning: {stuck_count} stuck slots detected[/yellow]")
console.print("Run 'cleanup --force' to recover stuck slots")
if orphaned_count > 0:
console.print(f"\n⚠️ [yellow]Warning: {orphaned_count} orphaned slots detected[/yellow]")
asyncio.run(_run_health_check())
# ------------------------------------------------------------------ #
# Validation Command
# ------------------------------------------------------------------ #
@app.command()
def validate():
"""Validate all prerequisites are installed and configured correctly."""
console.print("\n[bold cyan]Claude Dispatcher Setup Validation[/bold cyan]\n")
test_results = []
def add_result(component: str, test: str, passed: bool, message: str = ""):
"""Add a test result."""
test_results.append({
"component": component,
"test": test,
"passed": passed,
"message": message
})
status = "[green]✓ PASS[/green]" if passed else "[red]✗ FAIL[/red]"
console.print(f"{status} {component}: {test} {message}")
# Python version
version = sys.version_info
passed = version >= (3, 9)
message = f"(Python {version.major}.{version.minor}.{version.micro})"
add_result("Python", "Version >= 3.9", passed, message)
# Required packages
try:
import psutil
add_result("psutil", "Installation", True, f"(v{psutil.__version__})")
except ImportError:
add_result("psutil", "Installation", False, "(Not installed)")
# uv installed
try:
result = subprocess.run(["uv", "--version"], capture_output=True, text=True)
passed = result.returncode == 0
version_str = result.stdout.strip() if passed else "Not found"
add_result("uv", "Installation", passed, f"({version_str})")
except FileNotFoundError:
add_result("uv", "Installation", False, "(Not found in PATH)")
# tmux installed
try:
result = subprocess.run(["tmux", "-V"], capture_output=True, text=True)
passed = result.returncode == 0
version_str = result.stdout.strip() if passed else "Not found"
add_result("tmux", "Installation", passed, f"({version_str})")
except FileNotFoundError:
add_result("tmux", "Installation", False, "(Not found in PATH)")
# Redis server
try:
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
r.ping()
# Test basic operations
test_key = "claude_dispatcher_test"
r.set(test_key, "test_value")
value = r.get(test_key)
r.delete(test_key)
passed = value == "test_value"
add_result("Redis", "Server connection", passed, f"({REDIS_HOST}:{REDIS_PORT})")
except redis.RedisError as e:
add_result("Redis", "Server connection", False, f"({str(e)})")
# redis-cli
try:
result = subprocess.run(["redis-cli", "--version"], capture_output=True, text=True)
passed = result.returncode == 0
version_str = result.stdout.strip().split()[1] if passed else "Not found"
add_result("redis-cli", "Installation", passed, f"(v{version_str})")
except FileNotFoundError:
add_result("redis-cli", "Installation", False, "(Not found in PATH)")
# claude binary
try:
result = subprocess.run(["which", "claude"], capture_output=True, text=True)
found = result.returncode == 0
if not found:
add_result("claude", "Binary in PATH", False, "(Not found)")
else:
result = subprocess.run(["claude", "--version"], capture_output=True, text=True)
if result.returncode == 0:
version_str = result.stdout.strip()
add_result("claude", "Binary executable", True, f"({version_str})")
else:
error = result.stderr.strip()
add_result("claude", "Binary executable", False, f"(Error: {error[:50]}...)")
except FileNotFoundError:
add_result("claude", "Binary in PATH", False, "(which command not found)")
# Summary table
console.print("\n[bold]Test Summary[/bold]")
table = Table()
table.add_column("Component", style="cyan")
table.add_column("Test", style="white")
table.add_column("Result", style="white")
table.add_column("Details", style="dim")
passed_count = 0
total_count = len(test_results)
for result in test_results:
status = "[green]PASS[/green]" if result["passed"] else "[red]FAIL[/red]"
table.add_row(
result["component"],
result["test"],
status,
result["message"]
)
if result["passed"]:
passed_count += 1
console.print(table)
# Overall result
console.print(f"\n[bold]Overall: {passed_count}/{total_count} tests passed[/bold]")
if passed_count == total_count:
console.print("\n[bold green]✓ All tests passed! System is ready.[/bold green]")
console.print("\nNext steps:")
console.print("1. Run: [cyan]uv run ./claude_dispatcher.py setup[/cyan]")
console.print("2. Run: [cyan]uv run ./claude_dispatcher.py run[/cyan]")
else:
console.print("\n[bold red]✗ Some tests failed. Please fix the issues above.[/bold red]")
# Provide hints for common issues
console.print("\n[yellow]Common fixes:[/yellow]")
if not any(r["component"] == "Redis" and r["passed"] for r in test_results):
console.print("- Start Redis: [cyan]redis-server[/cyan]")
if not any(r["component"] == "uv" and r["passed"] for r in test_results):
console.print("- Install uv: [cyan]curl -LsSf https://astral.sh/uv/install.sh | sh[/cyan]")
if not any(r["component"] == "claude" and r["passed"] for r in test_results):
console.print("- Install Claude CLI or update PATH")
raise typer.Exit(code=1)
# ------------------------------------------------------------------ #
# CLI Commands
# ------------------------------------------------------------------ #
@app.command()
def setup(
slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of tmux slots to create."),
):
"""Creates (or verifies) tmux sessions and cleans their Redis state."""
async def _async_setup():
console.print(f"Setting up {slots} tmux slots...")
logger.info(f"Setting up {slots} tmux slots")
created = 0
existing = 0
for n in range(1, slots + 1):
session = _session_name(n)
pane = _pane(n)
# Use `has-session` to make setup idempotent and non-destructive.
proc = await asyncio.create_subprocess_exec(
"tmux", "has-session", "-t", session,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
if await proc.wait() != 0:
proc = await asyncio.create_subprocess_exec(
"tmux", "new-session", "-d", "-s", session,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
if await proc.wait() != 0:
if proc.stderr:
stderr = await proc.stderr.read()
console.print(f"[red]✗ Failed to create session {session}: {stderr.decode()}")
continue
# Don't use exec to keep the shell alive
proc = await asyncio.create_subprocess_exec(
"tmux", "send-keys", "-t", session, "echo 'Slot ready'", "Enter",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
if await proc.wait() != 0:
if proc.stderr:
stderr = await proc.stderr.read()
console.print(f"[red]✗ Failed to initialize session {session}: {stderr.decode()}")
continue
console.print(f"[green]✓[/green] Session `{session}` created.")
created += 1
else:
console.print(f"[yellow]→[/yellow] Session `{session}` already exists.")
existing += 1
# Always reset the Redis key to ensure a clean state.
redis_client.delete(_slot_key(n))
console.print(f"[bold green]✓ Setup complete.[/bold green] Created: {created}, Existing: {existing}, All slots are idle.")
logger.success(f"Setup complete: {created} created, {existing} existing")
asyncio.run(_async_setup())
@app.command()
def status(
slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of slots to check."),
watch: bool = typer.Option(False, "--watch", "-w", help="Continuously watch status."),
):
"""Show the current status of all slots."""
async def _show_status():
while True:
table = Table(title=f"Claude Dispatcher Status - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
table.add_column("Slot", style="cyan")
table.add_column("Status", style="green")
table.add_column("Job ID", style="yellow")
table.add_column("Prompt", style="blue")
table.add_column("Start Time", style="magenta")
table.add_column("Last Output", style="dim")
statuses = await asyncio.gather(*[_get_slot_status(n) for n in range(1, slots + 1)])
for status in statuses:
slot_num = status["slot"]
state = status["status"]
job_id = status.get("job_id", "-")
last_output = status.get("last_output", "")
if status.get("job_info"):
info = status["job_info"]
prompt = info.get("prompt", "N/A")
start_time = info.get("start_time", "N/A")
if start_time != "N/A":
start_dt = datetime.fromisoformat(start_time)
start_time = start_dt.strftime("%H:%M:%S")
else:
prompt = "-"
start_time = "-"
status_emoji = "🟢" if state == "idle" else "🔴"
table.add_row(
f"{slot_num}",
f"{status_emoji} {state}",
str(job_id) if job_id else "-",
prompt,
start_time,
last_output
)
console.clear()
console.print(table)
if not watch:
break
await asyncio.sleep(2)
asyncio.run(_show_status())
@app.command()
def cleanup(
slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of slots to clean."),
force: bool = typer.Option(False, "--force", "-f", help="Force cleanup without confirmation."),
kill_stuck: bool = typer.Option(False, "--kill-stuck", help="Kill stuck tmux sessions."),
):
"""Force cleanup of all Redis locks and handle stuck/orphaned slots."""
async def _async_cleanup():
if not force:
confirm = typer.confirm("This will clear all slot locks. Continue?")
if not confirm:
console.print("[yellow]Cleanup cancelled.[/yellow]")
return
console.print(f"Cleaning up {slots} slots...")
cleared = 0
stuck_killed = 0
orphaned_cleaned = 0
for n in range(1, slots + 1):
slot_key = _slot_key(n)
status = await _get_slot_status(n)
# Handle stuck slots
if kill_stuck and status["health"] == "stuck":
logger.info(f"Killing stuck tmux session for slot {n}")
subprocess.run(
["tmux", "kill-pane", "-t", _pane(n)],
capture_output=True
)
stuck_killed += 1
# Handle orphaned slots
if status["health"] == "orphaned":
logger.info(f"Cleaning orphaned slot {n}")
orphaned_cleaned += 1
if redis_client.exists(slot_key):
value = redis_client.get(slot_key)
redis_client.delete(slot_key)
console.print(f"[green]✓[/green] Cleared slot {n} (was: {value})")
cleared += 1
# Also clear associated job info
if value and value.startswith("busy:job_"):
job_id = int(value.split("_")[1])
redis_client.delete(_job_info_key(job_id))
else:
console.print(f"[dim]Slot {n} was already idle[/dim]")
# Clean up all old job info keys
pattern = f"{PREFIX}:job:*"
job_keys = list(redis_client.scan_iter(match=pattern))
if job_keys:
redis_client.delete(*job_keys)
console.print(f"[green]✓[/green] Cleaned up {len(job_keys)} job info keys")
console.print(f"[bold green]✓ Cleanup complete.[/bold green]")
console.print(f" Cleared: {cleared} slots")
if kill_stuck:
console.print(f" Killed: {stuck_killed} stuck sessions")
if orphaned_cleaned > 0:
console.print(f" Cleaned: {orphaned_cleaned} orphaned slots")
logger.info(f"Cleanup complete: cleared {cleared} slots, killed {stuck_killed} stuck, cleaned {orphaned_cleaned} orphaned")
asyncio.run(_async_cleanup())
@app.command()
def run(
prompts: Optional[List[str]] = typer.Argument(None, help="Prompts to dispatch. Uses built-ins if omitted."),
timeout: int = typer.Option(DEFAULT_TIMEOUT, "--timeout", "-t", help="Timeout in seconds for each job."),
claude_bin: str = typer.Option(DEFAULT_CMD, "--claude", help="Path to the Claude binary."),
max_slots: int = typer.Option(DEFAULT_SLOTS, "--slots", "-s", min=1, help="Number of slots to use."),
):
"""Dispatches prompts into the tmux slot pool concurrently."""
# This wrapper is the key fix for using async functions in a Typer command.
async def _async_run():
await _validate_dependencies(max_slots, claude_bin)
prompt_list = prompts
if not prompt_list:
console.print("[dim]Using built-in example prompts...[/dim]")
prompt_list = [
"What is 2 + 2?",
"Explain asyncio vs threading in Python.",
"Write a minimal async TCP echo server with tests and CI.",
"Convert 42 °C to °F.",
"Summarize 'Dune' in 150 words.",
"Create a Flask API with JWT, SQLAlchemy, Docker, and OpenAPI.",
]
logger.info(f"Dispatching {len(prompt_list)} jobs across {max_slots} slots")
start_time = datetime.now()
tasks = [
asyncio.create_task(_dispatch_one_job(i, p, timeout, claude_bin, max_slots))
for i, p in enumerate(prompt_list, 1)
]
try:
# Wait for all tasks with interrupt handling
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION
)
# Check if any task raised an exception
for task in done:
if task.exception():
raise task.exception()
# If shutdown was requested, cancel pending tasks
if _shutdown_event.is_set():
console.print("\n[yellow]Gracefully cancelling pending jobs...[/yellow]")
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
console.print("[yellow]Shutdown complete.[/yellow]")
return
# Wait for remaining tasks
if pending:
await asyncio.gather(*pending)
except KeyboardInterrupt:
console.print("\n[yellow]Received interrupt, shutting down gracefully...[/yellow]")
_shutdown_event.set()
# Cancel all pending tasks
for task in tasks:
if not task.done():
task.cancel()
# Wait for cancellation to complete
await asyncio.gather(*tasks, return_exceptions=True)
console.print("[yellow]Shutdown complete.[/yellow]")
return
elapsed = (datetime.now() - start_time).total_seconds()
console.print(f"[bold green]✓ All jobs have been dispatched.[/bold green] Total time: {elapsed:.2f}s")
logger.success(f"All {len(prompt_list)} jobs dispatched in {elapsed:.2f}s")
try:
asyncio.run(_async_run())
except KeyboardInterrupt:
console.print("\n[yellow]Interrupted by user.[/yellow]")
sys.exit(0)
# ------------------------------------------------------------------ #
# Demo Commands
# ------------------------------------------------------------------ #
def print_section(title: str, content: str = None):
"""Print a formatted section."""
if content:
console.print(Panel(content, title=f"[bold cyan]{title}[/bold cyan]", border_style="cyan"))
else:
console.print(f"\n[bold cyan]{'='*60}[/bold cyan]")
console.print(f"[bold cyan]{title}[/bold cyan]")
console.print(f"[bold cyan]{'='*60}[/bold cyan]\n")
async def run_demo_command(cmd: str, title: str = None):
"""Run a command and show output."""
if title:
console.print(f"\n[yellow]→ {title}[/yellow]")
console.print(f"[dim]$ {cmd}[/dim]")
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT
)
while True:
line = await proc.stdout.readline()
if not line:
break
console.print(line.decode().rstrip())
await proc.wait()
return proc.returncode == 0
@app.command()
def demo():
"""Run interactive demo scenarios to showcase the dispatcher."""
print_section("Claude Dispatcher Interactive Demo")
scenarios = {
"1": ("Basic Setup and Run", demo_basic),
"2": ("Job Queueing (More Jobs than Slots)", demo_queueing),
"3": ("Cleanup and Recovery", demo_recovery),
"4": ("Advanced Configuration", demo_advanced),
}
while True:
console.print("\n[bold]Available Demos:[/bold]")
for key, (name, _) in scenarios.items():
console.print(f"{key}. {name}")
console.print("5. Exit")
choice = Prompt.ask("\nSelect a demo", choices=["1", "2", "3", "4", "5"])
if choice == "5":
console.print("\n[green]Thanks for trying the Claude Dispatcher demo![/green]")
break
if choice in scenarios:
name, func = scenarios[choice]
print_section(f"Scenario {choice}: {name}")
func()
if not Confirm.ask("\nWould you like to try another demo?"):
break
def demo_basic():
"""Basic Demo: Setup and Simple Run"""
async def run():
print_section("Step 1: Validate Prerequisites",
"First, let's make sure everything is installed correctly.")
if not await run_demo_command("uv run ./claude_dispatcher.py validate", "Running setup validation"):
console.print("[red]Prerequisites check failed! Please fix issues before continuing.[/red]")
return
input("\nPress Enter to continue...")
# Setup slots
print_section("Step 2: Create Tmux Slots",
"Now we'll create 3 tmux sessions that will act as our worker slots.")
await run_demo_command("uv run ./claude_dispatcher.py setup --slots 3", "Creating 3 slots")
input("\nPress Enter to continue...")
# Check status
print_section("Step 3: Check Status",
"Let's see the status of our slots - they should all be idle.")
await run_demo_command("uv run ./claude_dispatcher.py status --slots 3", "Checking slot status")
input("\nPress Enter to continue...")
# Run simple jobs
print_section("Step 4: Run Simple Jobs",
"Now we'll dispatch 3 simple jobs - one for each slot.")
jobs = [
'"What is 2 + 2?"',
'"List 3 colors"',
'"Say hello"'
]
await run_demo_command(
f"uv run ./claude_dispatcher.py run --slots 3 --timeout 30 {' '.join(jobs)}",
"Dispatching 3 simple jobs"
)
console.print("\n[green]✓ Basic demo completed![/green]")
console.print("\nYou can check the tmux sessions with: [cyan]tmux ls[/cyan]")
console.print("Attach to a session with: [cyan]tmux attach -t claude_slot_1[/cyan]")
asyncio.run(run())
def demo_queueing():
"""Concurrent Demo: More Jobs than Slots"""
async def run():
console.print(Panel(
"This demo shows what happens when we have more jobs than available slots.\n"
"We'll create 3 slots but dispatch 6 jobs. The first 3 will start immediately,\n"
"and the remaining 3 will wait for slots to become available.",
title="[bold cyan]Overview[/bold cyan]",
border_style="cyan"
))
input("\nPress Enter to start...")
# Setup
await run_demo_command("uv run ./claude_dispatcher.py setup --slots 3", "Setting up 3 slots")
console.print("\n[yellow]Tip: Open another terminal and run:[/yellow]")
console.print("[cyan]watch -n 2 'uv run ./claude_dispatcher.py status --slots 3'[/cyan]")
console.print("[dim]to see live status updates![/dim]")
input("\nPress Enter to dispatch 6 jobs...")
# Dispatch 6 jobs with mixed durations
jobs = [
'"Count to 5 slowly"',
'"What is the meaning of life?"',
'"Write a haiku about coding"',
'"Explain quantum computing in one sentence"',
'"List prime numbers under 20"',
'"Tell a programming joke"'
]
start_time = time.time()
await run_demo_command(
f"uv run ./claude_dispatcher.py run --slots 3 --timeout 60 {' '.join(jobs)}",
"Dispatching 6 jobs to 3 slots"
)
elapsed = time.time() - start_time
console.print(f"\n[green]✓ All jobs dispatched in {elapsed:.1f} seconds[/green]")
console.print("\nNotice how jobs 4-6 had to wait for slots to become available!")
asyncio.run(run())
def demo_recovery():
"""Recovery Demo: Cleanup and Recovery"""
async def run():
console.print(Panel(
"This demo shows the cleanup and recovery features:\n"
"- Force cleanup of stuck slots\n"
"- Recovery from interrupted jobs\n"
"- Self-healing mechanisms",
title="[bold cyan]Overview[/bold cyan]",
border_style="cyan"
))
input("\nPress Enter to start...")
# Setup
await run_demo_command("uv run ./claude_dispatcher.py setup --slots 2", "Setting up 2 slots")
# Simulate stuck job
console.print("\n[yellow]Simulating a stuck slot...[/yellow]")
proc = await asyncio.create_subprocess_shell(
"redis-cli SET claude_slot:1 'busy:job_999'",
stdout=asyncio.subprocess.DEVNULL
)
await proc.wait()
# Show stuck status
await run_demo_command("uv run ./claude_dispatcher.py status --slots 2", "Status shows slot 1 is stuck")
input("\nPress Enter to cleanup...")
# Cleanup
await run_demo_command("uv run ./claude_dispatcher.py cleanup --slots 2 --force", "Force cleanup all slots")
# Verify cleanup
await run_demo_command("uv run ./claude_dispatcher.py status --slots 2", "All slots are now idle")
console.print("\n[green]✓ Cleanup demo completed![/green]")
asyncio.run(run())
def demo_advanced():
"""Advanced Demo: Custom Configuration"""
async def run():
console.print(Panel(
"This demo shows advanced features:\n"
"- Custom timeouts\n"
"- Different Claude binary paths\n"
"- Real-time monitoring\n"
"- Performance testing",
title="[bold cyan]Overview[/bold cyan]",
border_style="cyan"
))
# Get user input
slots = Prompt.ask("How many slots would you like?", default="4")
timeout = Prompt.ask("Timeout per job (seconds)?", default="120")
# Setup
await run_demo_command(f"uv run ./claude_dispatcher.py setup --slots {slots}", f"Setting up {slots} slots")
# Create performance test jobs
console.print("\n[yellow]Creating performance test jobs...[/yellow]")
job_count = int(slots) * 2
jobs = []
for i in range(job_count):
complexity = ["simple", "moderate", "complex"][i % 3]
jobs.append(f'"Job {i+1}: Simulate {complexity} task"')
console.print(f"Created {job_count} jobs for {slots} slots")
# Dispatch
start_time = time.time()
await run_demo_command(
f"uv run ./claude_dispatcher.py run --slots {slots} --timeout {timeout} {' '.join(jobs)}",
f"Dispatching {job_count} jobs"
)
elapsed = time.time() - start_time
console.print(f"\n[green]Performance: {job_count} jobs dispatched in {elapsed:.1f}s[/green]")
console.print(f"Average: {elapsed/job_count:.2f}s per job dispatch")
asyncio.run(run())
# ============================================
# USAGE FUNCTIONS (Following template pattern)
# ============================================
async def working_usage():
"""
Known working examples that demonstrate dispatcher functionality.
This function contains stable, tested code that reliably works.
CRITICAL FOR AGENTS:
- This function MUST verify that the dispatcher produces expected results
- Use assertions to validate outputs match expectations
- Return True only if ALL tests pass
- This is how agents verify the dispatcher actually works
"""
logger.info("=== Running Working Usage Examples ===")
try:
# Test 1: Validate prerequisites
logger.info("Test 1: Validating prerequisites")
# Check Python version
assert sys.version_info >= (3, 9), "Python 3.9+ required"
# Check Redis connection
test_redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
test_redis.ping()
test_key = "dispatcher_test"
test_redis.set(test_key, "working")
value = test_redis.get(test_key)
test_redis.delete(test_key)
assert value == "working", "Redis test failed"
logger.success("✓ Prerequisites validated")
# Test 2: Create and verify tmux sessions
logger.info("Test 2: Testing tmux session management")
test_slots = 2
# Create test sessions
for n in range(1, test_slots + 1):
session = f"test_{PREFIX}_{n}"
# Kill if exists
subprocess.run(["tmux", "kill-session", "-t", session],
capture_output=True)
# Create new
result = subprocess.run(["tmux", "new-session", "-d", "-s", session],
capture_output=True)
assert result.returncode == 0, f"Failed to create session {session}"
# Verify sessions exist
for n in range(1, test_slots + 1):
session = f"test_{PREFIX}_{n}"
result = subprocess.run(["tmux", "has-session", "-t", session],
capture_output=True)
assert result.returncode == 0, f"Session {session} not found"
# Cleanup test sessions
for n in range(1, test_slots + 1):
session = f"test_{PREFIX}_{n}"
subprocess.run(["tmux", "kill-session", "-t", session],
capture_output=True)
logger.success("✓ Tmux session management working")
# Test 3: Verify atomic slot claiming
logger.info("Test 3: Testing atomic slot claiming")
slot_key = f"{PREFIX}:test:1"
# Ensure clean state
test_redis.delete(slot_key)
# First claim should succeed
result1 = test_redis.set(slot_key, "busy:job_1", nx=True)
assert result1 is True, "First claim should succeed"
# Second claim should fail
result2 = test_redis.set(slot_key, "busy:job_2", nx=True)
assert result2 is None or result2 is False, "Second claim should fail"
# Cleanup
test_redis.delete(slot_key)
# After cleanup, claim should succeed
result3 = test_redis.set(slot_key, "busy:job_3", nx=True)
assert result3 is True, "Claim after cleanup should succeed"
test_redis.delete(slot_key)
logger.success("✓ Atomic slot claiming verified")
# Test 4: Verify status reporting
logger.info("Test 4: Testing status reporting")
# Create test job info
job_id = 9999
job_info = {
"prompt": "Test prompt",
"slot": 1,
"start_time": datetime.now().isoformat(),
"status": "running"
}
job_key = f"{PREFIX}:job:{job_id}"
test_redis.hset(job_key, mapping=job_info)
# Verify retrieval
stored_info = test_redis.hgetall(job_key)
assert stored_info["prompt"] == "Test prompt", "Job info storage failed"
assert stored_info["slot"] == "1", "Slot number mismatch"
assert stored_info["status"] == "running", "Status mismatch"
# Cleanup
test_redis.delete(job_key)
logger.success("✓ Status reporting verified")
logger.success("=== All Working Usage Tests Passed! ===")
return True
except AssertionError as e:
logger.error(f"Test assertion failed: {e}")
return False
except Exception as e:
logger.error(f"Test failed with error: {e}")
logger.exception("Full traceback:")
return False
async def debug_function():
"""
Debug function for testing new features or debugging issues.
Update this frequently while developing/debugging.
"""
logger.info("=== Running Debug Function ===")
# Test specific dispatcher functionality
logger.debug("Testing dispatcher components...")
# Example: Test concurrent job dispatch simulation
async def simulate_job(job_id: int, delay: float):
logger.debug(f"Job {job_id} starting (delay: {delay}s)")
await asyncio.sleep(delay)
logger.debug(f"Job {job_id} completed")
return job_id
# Run multiple jobs concurrently
jobs = [
simulate_job(1, 0.5),
simulate_job(2, 0.3),
simulate_job(3, 0.7),
]
results = await asyncio.gather(*jobs)
logger.debug(f"All jobs completed: {results}")
# Test Redis connection pooling
if redis_client:
for i in range(5):
key = f"debug_test_{i}"
redis_client.setex(key, 10, f"value_{i}")
value = redis_client.get(key)
logger.debug(f"Redis test {i}: {value}")
redis_client.delete(key)
return True
async def stress_test():
"""
Run stress tests for the dispatcher system.
Tests concurrent job handling, timeouts, and recovery.
"""
logger.info("=== Running Stress Tests ===")
# Stress test configuration
stress_config = {
"name": "dispatcher_stress_test",
"description": "Test dispatcher under load",
"tests": [
{
"name": "concurrent_claims",
"description": "Test many concurrent slot claims",
"concurrent_jobs": 20,
"slots": 5,
"iterations": 3
},
{
"name": "timeout_handling",
"description": "Test timeout and cleanup",
"jobs_with_timeouts": 10,
"timeout_seconds": 2
}
]
}
logger.info(f"Running stress test: {stress_config['name']}")
# Test 1: Concurrent claims
logger.info("Test 1: Concurrent slot claiming")
async def try_claim_slot(job_id: int, slot_num: int) -> bool:
"""Simulate claiming a slot."""
key = f"{PREFIX}:stress:{slot_num}"
result = redis_client.set(key, f"job_{job_id}", nx=True, ex=5)
if result:
await asyncio.sleep(0.1) # Simulate work
redis_client.delete(key)
return result
success_count = 0
tasks = []
for i in range(20):
slot = (i % 5) + 1
tasks.append(try_claim_slot(i, slot))
results = await asyncio.gather(*tasks)
success_count = sum(1 for r in results if r)
logger.info(f"Concurrent claims: {success_count}/20 succeeded")
assert success_count >= 5, "At least 5 claims should succeed"
logger.success("=== Stress Tests Completed ===")
return True
if __name__ == "__main__":
"""
Script entry point with triple-mode execution.
Usage:
python claude_dispatcher.py # Runs working_usage()
python claude_dispatcher.py debug # Runs debug_function()
python claude_dispatcher.py stress # Runs stress_test()
python claude_dispatcher.py <command> # Runs typer CLI command
This pattern provides:
1. Stable working examples that always run
2. Debug playground for testing without breaking working code
3. Stress testing for load scenarios
4. Full CLI functionality via typer
"""
# Check if running in test mode
if len(sys.argv) == 1:
# No arguments - run working usage
logger.info("Running in WORKING mode...")
success = asyncio.run(working_usage())
exit(0 if success else 1)
elif len(sys.argv) == 2:
arg = sys.argv[1]
if arg == "debug":
logger.info("Running in DEBUG mode...")
success = asyncio.run(debug_function())
exit(0 if success else 1)
elif arg == "stress":
logger.info("Running in STRESS TEST mode...")
success = asyncio.run(stress_test())
exit(0 if success else 1)
# Otherwise, run the typer CLI
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment