Skip to content

Instantly share code, notes, and snippets.

@joelhooks
Last active March 18, 2026 16:22
Show Gist options
  • Select an option

  • Save joelhooks/11aea283acfd5a7f50e596bc63bbdd28 to your computer and use it in GitHub Desktop.

Select an option

Save joelhooks/11aea283acfd5a7f50e596bc63bbdd28 to your computer and use it in GitHub Desktop.
cmux × pi: Multi-Agent Orchestration Rig — full spec for using cmux as a control plane for coordinated pi agent workloads

cmux × pi: Multi-Agent Orchestration Rig

A spec for turning cmux into the control plane for coordinated multi-agent development — visible, steerable, auditable.

The Idea

cmux is already a terminal multiplexer with 130+ CLI methods, browser surfaces, sidebar status, notifications, and workspace management. Pi is already an extensible coding agent with RPC mode, lifecycle events, tool registration, session forking, and an EventBus. pi-tasks already provides structured task tracking with dependency DAGs, subagent spawning via EventBus RPC, auto-cascade, and file-locked shared task stores.

None of these systems know about each other. This spec bridges them.

What This Enables

You sit in one pi session (the orchestrator). You say:

"Deploy the new auth system across all three repos."

The orchestrator:

  1. Creates a task DAG via pi-tasks (4 tasks, 3 repos + integration tests)
  2. Spawns 3 worker pi sessions in separate cmux workspaces via spawn-pi
  3. Each worker gets a focused prompt, a scoped cwd, and a model assignment
  4. The fleet dashboard widget shows live status of all agents
  5. cmux sidebar/tab indicators show which workspaces need attention
  6. Workers complete → pi-tasks auto-cascades to the integration test task
  7. You can switch to any workspace and take over at any time
  8. Everything is in your terminal — not a web UI, not an abstraction layer

Design Principles

  • Visible by default. Every agent runs in a real cmux workspace you can see, read, and interact with. No invisible background processes unless you opt in.
  • Steerable. The orchestrator can steer workers mid-flight. You can take over from any agent. Workers can be aborted, retried, or redirected.
  • Composable. Each piece (cmux extension, pi-tasks, RPC bridge, fleet dashboard) works independently and compounds when combined.
  • Safe. Workers load pi-cmux (standalone package) for full visibility — never --no-extensions blindness. PI_CMUX_ROLE=worker disables subprocess spawns while keeping sidebar/notifications/tool activity. RPC workers are sandboxed. Shared task stores use file locking.

Spec Files

File Contents
01-ARCHITECTURE.md System architecture and component map
02-SPAWN-PI.md spawn-pi tool — launch worker agents in cmux
03-FLEET-DASHBOARD.md Live multi-agent dashboard widget
04-RPC-BRIDGE.md Headless pi --mode rpc worker pool
05-CROSS-SESSION.md Cross-workspace event relay
06-TASK-INTEGRATION.md pi-tasks as the orchestration backbone
07-PATTERNS-STOLEN.md Patterns from dmux, pi-coordination, pi-boomerang, jido_symphony

Prior Art / Building Blocks

  • pi-tasks — Task DAG, subagent spawning, auto-cascade, shared stores, process tracking. The task model and EventBus RPC pattern are directly reusable.
  • cmux extension (joelhooks/pi-tools) — Sidebar status, lifecycle hooks, notification lifecycle, live tool activity.
  • dag-dispatch — Redis pub/sub pattern for async workflow completion.
  • codex-exec — Background process spawning with output buffering.
  • job-monitor — TUI widget for tracking active background jobs.
  • ralph-loop — Iterative agent loops with PRD-driven task selection.

Architecture

Component Map

┌─────────────────────────────────────────────────────────────────────────┐
│                              cmux (macOS app)                           │
│                                                                         │
│  ┌─ workspace:1 "orchestrator" ────────────────────────────────────┐    │
│  │  ┌─ pi (opus) ──────────────┐  ┌─ Fleet Dashboard ──────────┐  │    │
│  │  │ Tools:                    │  │ agent:1 auth   ● Running   │  │    │
│  │  │   spawn-pi                │  │ agent:2 tests  ◉ Idle      │  │    │
│  │  │   read-agent              │  │ agent:3 deploy ● Running   │  │    │
│  │  │   send-agent              │  │ rpc:1 lint     ◌ Queued    │  │    │
│  │  │   kill-agent              │  │                            │  │    │
│  │  │   TaskCreate/Execute/...  │  │ [2/4 done] ████░░░ 50%    │  │    │
│  │  │   cmux/cmux_status/notify │  └────────────────────────────┘  │    │
│  │  └──────────────────────────┘                                    │    │
│  └──────────────────────────────────────────────────────────────────┘    │
│                                                                         │
│  ┌─ workspace:2 "auth" ──────┐  ┌─ workspace:3 "tests" ──────────┐    │
│  │  pi (sonnet) — spawned     │  │  pi (sonnet) — spawned          │    │
│  │  cwd: ~/repos/auth-service │  │  cwd: ~/repos/test-suite        │    │
│  │  task: #1 "update auth"    │  │  task: #2 "run integration"     │    │
│  │  [visible, interactive]    │  │  [visible, interactive]          │    │
│  └────────────────────────────┘  └──────────────────────────────────┘    │
│                                                                         │
│  ┌─ RPC Worker Pool (invisible) ──────────────────────────────────┐    │
│  │  pi --mode rpc (haiku) × N   ← JSON-over-stdin control         │    │
│  │  High-throughput parallel tasks (lint, format, simple fixes)     │    │
│  └─────────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────────┘

Layers

Layer 0: cmux (Terminal Fabric)

The physical substrate. Manages windows, workspaces, panes, surfaces. Provides: tree, read-screen, send, new-workspace, notify, mark-read/mark-unread, set-status, sidebar, browser surfaces.

Key capability for orchestration: new-workspace --cwd <path> --command <cmd> can spawn a pi session in one call. read-screen can observe any surface. send can inject text into any surface.

Layer 1: Pi (Agent Runtime)

Each workspace runs an independent pi process. Pi provides:

  • Interactive mode — full TUI with tools, extensions, skills
  • RPC mode (--mode rpc) — headless JSON-over-stdin control with prompt/steer/follow_up/abort/get_state/get_messages/compact/fork/new_session
  • Extension API — 30+ lifecycle events, tool registration, sendUserMessage(), sendMessage(), exec(), EventBus, custom UI (widgets, overlays, footer)
  • Session management — fork, navigate tree, switch, compact, export

Layer 2: cmux Extension (Bridge)

The existing cmux extension (joelhooks/pi-tools/cmux/cmux.ts) bridges pi ↔ cmux. Currently handles: sidebar status, lifecycle notifications, session naming, tool activity display, mark-read/unread attention cycle.

This spec extends it with: spawn-pi, read-agent, send-agent, kill-agent tools, fleet tracking state, RPC worker pool management, cross-session event relay.

Layer 3: pi-tasks (Orchestration Logic)

@tintinweb/pi-tasks provides the task model:

  • Task DAG with blocks/blockedBy dependency edges
  • Shared file-backed stores with file locking for multi-session coordination
  • Subagent spawning via EventBus RPC (subagents:rpc:spawn)
  • Auto-cascade — completed tasks trigger dependent task execution
  • Process tracking — output buffering, blocking wait, graceful stop
  • TUI widget — live task list with spinners, token counts, elapsed time

Layer 4: Orchestrator (User's Pi Session)

The human's primary session. Has all the tools from layers 2-3. Can:

  • Break down high-level directives into task DAGs
  • Spawn visible workers in cmux workspaces
  • Spawn invisible RPC workers for high-throughput tasks
  • Monitor fleet status via dashboard widget
  • Steer/abort/retry workers
  • Read worker output and make decisions
  • Take over from any worker by switching workspaces

Data Flow

User prompt
    │
    ▼
Orchestrator (pi, opus)
    │
    ├── TaskCreate × N ──→ pi-tasks store (file-backed, locked)
    │
    ├── spawn-pi ──→ cmux new-workspace ──→ Worker pi (interactive)
    │                     │                      │
    │                     │                      ├── tool_execution_start ──→ cmux set-status
    │                     │                      ├── agent_end ──→ cmux notify + mark-unread
    │                     │                      └── completion ──→ pi-tasks store update
    │                     │
    │                     └── read-screen ◀── Orchestrator polls or event-driven
    │
    ├── RPC bridge ──→ pi --mode rpc (headless)
    │                      │
    │                      ├── JSON stdout events ──→ Orchestrator
    │                      └── JSON stdin commands ◀── Orchestrator
    │
    └── Fleet Dashboard widget ◀── cmux tree + read-screen + task store

Agent Identity

Each spawned agent gets a unique ID (UUID short prefix). This ID is:

  • Set as a cmux surface/workspace tag
  • Stored in the pi-tasks task metadata (agentId)
  • Used as the owner field on the task
  • Included in cmux sidebar status
  • Used for the agentTaskMap lookup on completion

This creates a single key that traces through: cmux workspace → pi session → task store → fleet dashboard → completion handler.

spawn-pi Tool

The highest-leverage addition. Lets the orchestrator create worker agents in visible cmux workspaces with a single tool call.

Interface

pi.registerTool({
  name: "spawn_pi",
  label: "Spawn Pi Agent",
  description: "Spawn a new pi agent in a cmux workspace. The agent runs " +
    "in a visible terminal you can switch to at any time.",
  parameters: Type.Object({
    prompt: Type.String({ description: "Initial prompt for the agent" }),
    cwd: Type.Optional(Type.String({ description: "Working directory (default: current)" })),
    model: Type.Optional(Type.String({ description: "Model to use (default: claude-sonnet-4)" })),
    task_id: Type.Optional(Type.String({ description: "pi-tasks task ID to bind to this agent" })),
    workspace_name: Type.Optional(Type.String({ description: "cmux workspace label" })),
    direction: Type.Optional(Type.Union([
      Type.Literal("workspace"),  // new workspace (default)
      Type.Literal("right"),      // split right in current workspace
      Type.Literal("down"),       // split down in current workspace
    ])),
    skills: Type.Optional(Type.Array(Type.String(), {
      description: "Skill names to load (e.g., ['next-best-practices'])"
    })),
    session: Type.Optional(Type.String({
      description: "Session file to continue (for resuming previous work)"
    })),
  }),
});

Behavior

1. Create the surface

if (direction === "workspace" || !direction) {
  // New workspace
  const result = cmux("new-workspace", "--cwd", cwd);
  // Parse workspace/surface refs from result
} else {
  // Split in current workspace
  const result = cmux("new-split", direction);
}

2. Build the pi command

const piArgs = [
  "pi",
  "--model", model || "claude-sonnet-4",
  "--print",  // non-interactive: run prompt and keep running for follow-ups
];

if (skills?.length) {
  for (const skill of skills) piArgs.push("--skill", skill);
}
if (session) {
  piArgs.push("--continue", session);
}

// Prompt goes via stdin after launch, or as trailing arg
piArgs.push(JSON.stringify(prompt));

3. Send the command

cmux("send", "--surface", surfaceRef, piArgs.join(" ") + "\n");

4. Register in fleet tracker

const agentId = crypto.randomUUID().slice(0, 8);
fleet.set(agentId, {
  surfaceRef,
  workspaceRef,
  taskId: task_id,
  model,
  cwd,
  prompt: prompt.slice(0, 200),
  status: "starting",
  spawnedAt: Date.now(),
});

5. Bind to pi-tasks (if task_id provided)

if (task_id) {
  store.update(task_id, {
    status: "in_progress",
    owner: agentId,
    metadata: { agentId, surfaceRef, workspaceRef },
  });
}

6. Monitor for completion

Two strategies:

Strategy A: Screen polling (simpler, works today)

// Poll every 5s
setInterval(() => {
  const screen = cmuxSafe("read-screen", "--surface", surfaceRef, "--lines", "5");
  if (screen?.includes("waiting for input") || screen?.includes("$")) {
    fleet.get(agentId).status = "idle";
    if (task_id) store.update(task_id, { status: "completed" });
    // Notify orchestrator
    pi.sendMessage({
      customType: "agent_complete",
      content: [{ type: "text", text: `Agent ${agentId} completed task #${task_id}` }],
    });
  }
}, 5000);

Strategy B: cmux notification relay (cleaner, requires cmux hook) Each worker's cmux extension already calls notify + mark-unread on agent_end. The orchestrator's extension listens for cmux notification events (if cmux supports event subscription — otherwise falls back to polling).

Return Value

{
  "agent_id": "a3f7b2c1",
  "workspace": "workspace:7",
  "surface": "surface:12",
  "task_id": "3",
  "status": "launched"
}

Companion Tools

read_agent — Read a worker's state

parameters: Type.Object({
  agent_id: Type.String({ description: "Agent ID from spawn_pi" }),
  lines: Type.Optional(Type.Number({ description: "Lines to read (default: 30)" })),
})

// Implementation: cmux read-screen --surface <surfaceRef> --lines <n>
// Parse output to extract: last assistant response, status, model info

send_agent — Steer a worker mid-flight

parameters: Type.Object({
  agent_id: Type.String({ description: "Agent ID" }),
  message: Type.String({ description: "Message to send" }),
})

// Implementation: cmux send --surface <surfaceRef> "<message>\n"
// For RPC workers: { type: "steer", message } via stdin

kill_agent — Abort a worker

parameters: Type.Object({
  agent_id: Type.String({ description: "Agent ID" }),
  cleanup: Type.Optional(Type.Boolean({ description: "Close the workspace too" })),
})

// Implementation: cmux send-key --surface <surfaceRef> C-c
// Then optionally: cmux close-workspace --workspace <workspaceRef>

Extension Loading for Workers

Workers are real pi sessions with curated extensions — not lobotomized --no-extensions drones. The extension system is an asset. The orchestrator decides which extensions each worker gets.

Curated Extension Sets

The spawn_pi tool accepts an extensions parameter — a list of extension paths or package names to load. The orchestrator picks the right set per task:

# Full-featured worker (most tasks)
pi -e pi-cmux/cmux.ts \
   -e pi-tasks/index.ts \
   -e agent-secrets/agent-secrets.ts \
   --model claude-sonnet-4 \
   "refactor the auth middleware"

# Lightweight worker (lint, format)
pi -e pi-cmux/cmux.ts \
   --tools read,bash \
   --model claude-haiku-4-5 \
   "lint src/ and fix all errors"

# Worker with MCQ for interactive decisions
pi -e pi-cmux/cmux.ts \
   -e mcq/index.ts \
   --model claude-sonnet-4 \
   "set up the new project — ask me about stack choices"

--no-extensions only disables auto-discovery (prevents the pi-tools megapackage from loading everything). Explicit -e paths still load.

Extension Presets

Common combinations for spawn_pi:

Preset Extensions Use Case
standard pi-cmux, pi-tasks, agent-secrets Most work
minimal pi-cmux Lint, format, simple fixes
interactive pi-cmux, mcq Tasks needing user choices
full (all of pi-tools) Orchestrator-grade agents

Worker Mode (PI_CMUX_ROLE=worker)

pi-cmux detects PI_CMUX_ROLE=worker and adjusts:

Feature Orchestrator Worker
Sidebar status + tool activity
Notifications on agent_end ✅ (focus-aware)
mark-read/mark-unread
cmux tools
Session naming (subprocess)
Turn summary (subprocess)

Worker mode keeps ALL visibility while disabling the two features that spawn child pi processes. The fork-bomb risk is eliminated without losing anything.

Additional Safety

  • The orchestrator tracks all spawned surfaces and cleans up on session_shutdown.
  • Max concurrent agent limit (configurable, default 5) prevents runaway spawning.
  • Workers can be sandboxed via --tools read,bash for restricted operations.
  • PI_CMUX_CHILD=1 env guard remains as belt-and-suspenders for haiku spawns.

Fleet Dashboard

A live TUI widget in the orchestrator workspace showing the state of all agents across all cmux workspaces.

What It Looks Like

┌─ Agent Fleet ─────────────────────────────────────────────────┐
│ ● auth        workspace:2  sonnet   Reading auth.ts     2m 3s │
│ ◉ tests       workspace:3  sonnet   Idle — 47/47 pass   4m 1s │
│ ● deploy      workspace:4  sonnet   Running vercel...   1m 8s │
│ ◌ rpc:lint-1  (headless)   haiku    Queued               —    │
│ ◌ rpc:lint-2  (headless)   haiku    Queued               —    │
│                                                               │
│ Tasks: 2/4 done  ████████░░░░░░░ 50%   Agents: 3 active      │
└───────────────────────────────────────────────────────────────┘

Status Icons

Icon Meaning
Running — agent is streaming/executing tools
Idle — agent finished, waiting for input
Queued — task pending, agent not yet spawned
Completed — agent finished and task marked done
Failed — agent errored or was killed

Data Sources

The dashboard combines three data sources polled on a timer:

1. Fleet tracker (in-memory)

The spawn_pi tool registers agents in a Map<string, AgentInfo>:

interface AgentInfo {
  id: string;           // Short UUID
  surfaceRef: string;   // cmux surface ref
  workspaceRef: string; // cmux workspace ref
  taskId?: string;      // pi-tasks task ID
  model: string;
  cwd: string;
  prompt: string;       // First 200 chars
  status: "starting" | "running" | "idle" | "completed" | "failed";
  spawnedAt: number;
  lastActivity?: string; // Latest tool activity from read-screen
}

2. cmux tree + per-workspace read-screen

// Get workspace layout
const tree = cmux("tree", "--all");

// For each tracked agent, read the last few lines
for (const agent of fleet.values()) {
  const screen = cmuxSafe("read-screen", "--surface", agent.surfaceRef, "--lines", "3");
  agent.lastActivity = parseActivity(screen);
  agent.status = inferStatus(screen);
}

Activity parsing heuristics:

  • If screen contains the pi spinner/streaming indicator → running
  • If screen contains the pi input prompt → idle
  • If screen contains an error trace → failed
  • Extract the most recent tool use line for lastActivity

3. pi-tasks store

const tasks = store.list();
const completedCount = tasks.filter(t => t.status === "completed").length;
const totalCount = tasks.length;

Implementation

Widget registration

ctx.ui.setWidget("fleet", (tui, theme) => {
  return {
    render: () => renderFleetDashboard(tui, theme),
    invalidate: () => {},
  };
}, { placement: "aboveEditor" });

Update loop

const POLL_INTERVAL = 3000; // 3s when agents are active

let fleetTimer: ReturnType<typeof setInterval> | undefined;

function startFleetPolling() {
  if (fleetTimer) return;
  fleetTimer = setInterval(() => {
    updateFleetStatus();
    tui?.requestRender();
  }, POLL_INTERVAL);
}

function stopFleetPolling() {
  if (fleetTimer) {
    clearInterval(fleetTimer);
    fleetTimer = undefined;
  }
}

Adaptive polling

  • When agents are active: poll every 3s
  • When all agents idle/completed: poll every 15s
  • When no agents tracked: stop polling, hide widget

cmux sidebar sync

In addition to the widget, update the cmux sidebar status with a compact fleet summary:

cmuxSafe("set-status", "fleet",
  `${activeCount} agents · ${completedTasks}/${totalTasks} tasks`,
  "--icon", "person.3.fill",
  "--color", allDone ? "#34C759" : "#4C8DFF"
);

This is visible from ALL workspaces — you can see fleet status without switching back to the orchestrator.

Interaction

Click-to-switch (future)

If cmux adds clickable regions or keybinding support in the sidebar, the dashboard could support clicking an agent row to switch to its workspace:

cmux("select-workspace", "--workspace", agent.workspaceRef);

Keyboard shortcuts

Register shortcuts in the orchestrator for fleet management:

pi.registerShortcut("ctrl+shift+f", {
  description: "Toggle fleet dashboard",
  handler: () => toggleFleetWidget(),
});

pi.registerShortcut("ctrl+shift+n", {
  description: "Switch to next active agent workspace",
  handler: () => {
    const next = getNextActiveAgent();
    if (next) cmux("select-workspace", "--workspace", next.workspaceRef);
  },
});

Integration with pi-tasks Widget

Both pi-tasks and the fleet dashboard use ctx.ui.setWidget(). They coexist:

  • pi-tasks widget (key: "tasks") — shows task list with dependency state
  • Fleet dashboard (key: "fleet") — shows agent/workspace state

Both render above the editor. The fleet dashboard appears above the task list when agents are active, creating a unified orchestration view:

┌─ Agent Fleet ──────────────────────────────┐  ← fleet widget
│ ● auth   ws:2  Running auth.ts      2m 3s │
│ ◉ tests  ws:3  Idle — all passing    4m 1s │
└────────────────────────────────────────────┘
● 4 tasks (1 done, 2 in progress, 1 open)      ← pi-tasks widget
  ✔ Design auth schema
  ✳ Updating auth middleware… (2m 3s · ↑ 4.1k)
  ◼ Run integration tests
  ◻ Deploy to staging › blocked by #2, #3

RPC Bridge — Headless Worker Pool

For high-throughput parallel work that doesn't need visible workspaces. pi --mode rpc processes controlled via JSON-over-stdin.

Why Both Visible and Headless?

Visible (spawn-pi) Headless (RPC)
Use case Complex tasks needing observation Simple parallel tasks
Visibility Full cmux workspace, switchable Invisible, output via fleet dashboard
Cost One workspace per agent No workspace overhead
Control Switch to workspace, type directly JSON commands via stdin
Limit ~5-8 workspaces practical 20+ concurrent practical
Examples "Refactor auth system" "Lint these 15 files"

Pi's RPC Protocol (already exists)

Pi supports --mode rpc which reads JSON commands on stdin and emits JSON events on stdout. The full protocol from rpc-types.ts:

Commands (stdin → pi)

// Core prompting
{ type: "prompt", message: "fix the auth bug" }
{ type: "steer", message: "also check the tests" }      // inject while streaming
{ type: "follow_up", message: "now deploy it" }          // queue for after current turn
{ type: "abort" }

// State
{ type: "get_state" }         // → RpcSessionState
{ type: "get_messages" }      // → all messages in context

// Model control
{ type: "set_model", provider: "anthropic", modelId: "claude-haiku-4-5" }

// Session control
{ type: "new_session" }
{ type: "compact" }           // trigger context compaction
{ type: "set_session_name", name: "lint-worker" }

Events (pi → stdout)

Pi emits events as JSON lines:

// Agent lifecycle
{ type: "event", event: "agent_start" }
{ type: "event", event: "agent_end", data: { messages: [...] } }

// Streaming tokens
{ type: "event", event: "message_update", data: { text: "..." } }

// Tool execution
{ type: "event", event: "tool_execution_start", data: { toolName: "bash", args: {...} } }
{ type: "event", event: "tool_execution_end", data: { toolName: "bash", result: {...} } }

// State response
{ type: "response", command: "get_state", success: true, data: { isStreaming: false, ... } }

RPC Worker Pool

interface RpcWorker {
  id: string;
  proc: ChildProcess;
  stdin: Writable;
  taskId?: string;
  status: "idle" | "working" | "completed" | "failed";
  model: string;
  cwd: string;
  lastOutput?: string;
  spawnedAt: number;
}

class RpcWorkerPool {
  private workers = new Map<string, RpcWorker>();
  private maxWorkers: number;

  constructor(maxWorkers = 10) {
    this.maxWorkers = maxWorkers;
  }

  async spawn(options: {
    prompt: string;
    cwd?: string;
    model?: string;
    taskId?: string;
    tools?: string[];
  }): Promise<string> {
    const id = crypto.randomUUID().slice(0, 8);

    const args = [
      "--mode", "rpc",
      "--model", options.model || "claude-haiku-4-5",
      "--no-extensions",  // RPC workers are headless — no cmux visibility needed
      "--no-session",     // ephemeral
    ];

    if (options.tools) {
      args.push("--tools", options.tools.join(","));
    }

    const proc = spawn("pi", args, {
      cwd: options.cwd || process.cwd(),
      stdio: ["pipe", "pipe", "ignore"],
      env: { ...process.env, PI_CMUX_CHILD: "1" },
    });

    const worker: RpcWorker = {
      id,
      proc,
      stdin: proc.stdin!,
      taskId: options.taskId,
      status: "idle",
      model: options.model || "claude-haiku-4-5",
      cwd: options.cwd || process.cwd(),
      spawnedAt: Date.now(),
    };

    // Parse stdout JSON lines
    const rl = readline.createInterface({ input: proc.stdout! });
    rl.on("line", (line) => {
      try {
        const event = JSON.parse(line);
        this.handleEvent(id, event);
      } catch {}
    });

    proc.on("close", (code) => {
      worker.status = code === 0 ? "completed" : "failed";
      this.onWorkerDone(id);
    });

    this.workers.set(id, worker);

    // Send initial prompt
    this.send(id, { type: "prompt", message: options.prompt });

    return id;
  }

  send(workerId: string, command: RpcCommand): void {
    const worker = this.workers.get(workerId);
    if (!worker) throw new Error(`Worker ${workerId} not found`);
    worker.stdin.write(JSON.stringify(command) + "\n");
  }

  private handleEvent(workerId: string, event: any): void {
    const worker = this.workers.get(workerId);
    if (!worker) return;

    switch (event.event || event.type) {
      case "agent_start":
        worker.status = "working";
        break;
      case "agent_end":
        worker.status = "completed";
        // Extract last assistant text
        const messages = event.data?.messages || [];
        const last = messages.findLast((m: any) => m.role === "assistant");
        worker.lastOutput = last?.content?.[0]?.text?.slice(0, 500);
        this.onWorkerDone(workerId);
        break;
      case "tool_execution_start":
        // Update fleet dashboard with tool activity
        break;
    }
  }

  private onWorkerDone(workerId: string): void {
    const worker = this.workers.get(workerId);
    if (!worker) return;

    // Update pi-tasks if bound to a task
    if (worker.taskId) {
      store.update(worker.taskId, {
        status: worker.status === "completed" ? "completed" : "pending",
        metadata: { result: worker.lastOutput },
      });
    }

    // Notify orchestrator via pi.sendMessage
    pi.sendMessage({
      customType: "rpc_worker_complete",
      content: [{
        type: "text",
        text: `RPC worker ${workerId} ${worker.status}: ${worker.lastOutput || "(no output)"}`,
      }],
    });
  }

  kill(workerId: string): void {
    const worker = this.workers.get(workerId);
    if (!worker) return;
    this.send(workerId, { type: "abort" });
    setTimeout(() => worker.proc.kill("SIGTERM"), 2000);
    setTimeout(() => worker.proc.kill("SIGKILL"), 7000);
  }

  killAll(): void {
    for (const id of this.workers.keys()) this.kill(id);
  }
}

Tool Registration

pi.registerTool({
  name: "rpc_spawn",
  label: "RPC Worker",
  description: "Spawn a headless pi worker for quick parallel tasks. " +
    "Cheaper than spawn-pi (no workspace). Results delivered as messages.",
  parameters: Type.Object({
    prompt: Type.String({ description: "Task prompt" }),
    cwd: Type.Optional(Type.String()),
    model: Type.Optional(Type.String({ description: "Default: claude-haiku-4-5" })),
    task_id: Type.Optional(Type.String({ description: "pi-tasks task ID" })),
    tools: Type.Optional(Type.Array(Type.String(), {
      description: "Allowed tools (default: read,bash,edit,write)"
    })),
  }),
});

Cleanup

On orchestrator session_shutdown:

pi.on("session_shutdown", async () => {
  rpcPool.killAll();
  // Visible agents: optionally close their workspaces
  for (const agent of fleet.values()) {
    cmuxSafe("close-workspace", "--workspace", agent.workspaceRef);
  }
});

Cross-Session Event Relay

How agents in different cmux workspaces communicate without shared memory.

The Problem

Each pi process is independent. Pi's EventBus is per-process. When agent A in workspace:2 completes a task, agent B in workspace:1 doesn't know about it.

Three Relay Mechanisms

1. cmux as Message Bus (simplest, recommended)

cmux surfaces can send text to each other. The orchestrator can inject messages into worker sessions:

// Orchestrator tells worker to do something
cmux("send", "--surface", worker.surfaceRef, "Now run the integration tests\n");

// Orchestrator reads worker output
const screen = cmux("read-screen", "--surface", worker.surfaceRef, "--lines", "20");

This is crude but effective. The orchestrator polls workers periodically and can steer them based on what it sees. No new infrastructure required.

Enhancement: structured completion signals. Workers can echo a structured completion marker that the orchestrator's polling loop detects:

// Worker's session_shutdown or agent_end hook writes a marker
cmuxSafe("log", "--source", "worker", "--", JSON.stringify({
  type: "agent_complete",
  agentId,
  taskId,
  status: "completed",
  summary: lastAssistantText.slice(0, 200),
}));

The orchestrator's extension polls cmux sidebar-state for log entries from other workspaces.

2. Shared Task Store (already built)

pi-tasks supports project-scoped stores and PI_TASKS env variable for shared file paths. Multiple pi sessions can coordinate via the same task file:

# All workers use the same task list
export PI_TASKS=sprint-1

The file store uses file locking (O_EXCL + stale PID detection). Any session can read/write tasks. The orchestrator creates tasks, workers pick them up and mark them complete.

This is the pi-tasks way. No custom relay needed — the store IS the shared state. Workers poll the store (pi-tasks already re-reads on every get() and list() for file-backed stores).

Orchestrator                     Worker A                    Worker B
    │                                │                           │
    ├── TaskCreate #1,#2,#3 ───────▶ tasks.json                 │
    │                                │                           │
    │                           read tasks.json ◀────────────────┤
    │                                │                 pick #2   │
    │                                │                 update → tasks.json
    │                                │                           │
    │◀─── read tasks.json ──────────│                           │
    │   (#2 completed!)              │                           │
    │   cascade → spawn #3           │                           │

3. Unix Socket / Redis Pub-Sub (highest throughput)

For real-time event relay, use a side-channel:

Unix socket approach:

// Orchestrator creates a Unix socket
const server = net.createServer((conn) => {
  conn.on("data", (data) => {
    const event = JSON.parse(data.toString());
    handleWorkerEvent(event);
  });
});
server.listen("/tmp/pi-fleet-${sessionId}.sock");

// Workers connect and send events
const sock = net.connect("/tmp/pi-fleet-${sessionId}.sock");
sock.write(JSON.stringify({ type: "complete", taskId: "3" }));

Redis pub-sub approach (already used by dag-dispatch):

// Orchestrator subscribes
subscriber.subscribe("pi-fleet:events");
subscriber.on("message", (channel, message) => {
  const event = JSON.parse(message);
  handleWorkerEvent(event);
});

// Workers publish
publisher.publish("pi-fleet:events", JSON.stringify({
  type: "agent_end",
  agentId,
  taskId,
  summary,
}));

Recommended Approach

Start with mechanism 2 (shared task store). It's already built, battle-tested with file locking, and requires zero new infrastructure. The orchestrator polls the task store (it already does this via store.list()), and workers update their task status via the same store.

Add mechanism 1 (cmux send/read) for steering. When the orchestrator needs to redirect a worker or inject new instructions, it uses cmux send. When it needs to read worker output for decision-making, it uses cmux read-screen.

Graduate to mechanism 3 (Unix socket) only if polling latency matters. For most development workflows, 3-5 second polling is fine. Real-time relay is overkill until you're running 10+ agents and need sub-second coordination.

Event Types

Regardless of relay mechanism, standardize on event types:

interface FleetEvent {
  type: "agent_start" | "agent_end" | "task_complete" | "task_failed" |
        "tool_activity" | "needs_input" | "error";
  agentId: string;
  taskId?: string;
  workspaceRef?: string;
  timestamp: number;
  data?: {
    summary?: string;
    toolName?: string;
    error?: string;
  };
}

pi-tasks as Orchestration Backbone

How pi-tasks evolves from single-session task tracking to multi-agent coordination engine — and what to fork/extend from the existing codebase.

What pi-tasks Already Has (v1)

Feature Status Notes
Task CRUD with IDs TaskStore with Map-backed CRUD
Dependency DAG Bidirectional blocks/blockedBy with cycle warnings
File-backed shared stores PI_TASKS env, file locking, stale PID detection
Background process tracking ProcessTracker — spawn, buffer output, wait, stop
Subagent spawning via EventBus subagents:rpc:spawn with scoped reply channels
Auto-cascade on completion Completed tasks trigger unblocked dependents
TUI widget with spinners Star spinner, token counts, elapsed time, blocked-by
System-reminder injection Periodic nudges via tool_result event
/tasks interactive command View, create, clear, settings panel
Session/project/memory scoping Configurable via settings or env

What's Missing for Multi-Agent Orchestration

1. cmux-Aware Task Execution

pi-tasks spawns subagents via pi.events (EventBus RPC to @tintinweb/pi-subagents). These run as background agents within the same pi process. For the orchestration rig, we need tasks to spawn as separate pi processes in cmux workspaces.

Fork point: Replace or augment the spawnSubagent() function to support cmux-based spawning:

// Current: EventBus RPC (in-process subagent)
const agentId = await spawnSubagent(type, prompt, options);

// New: cmux workspace spawn (out-of-process)
const agentId = await spawnCmuxAgent(prompt, {
  cwd: task.metadata?.cwd,
  model: task.metadata?.model || "claude-sonnet-4",
  workspaceName: task.subject.slice(0, 30),
});

// New: RPC worker spawn (headless out-of-process)
const agentId = await rpcPool.spawn({
  prompt,
  model: "claude-haiku-4-5",
  taskId: task.id,
});

Decision: execution strategy per task.

Add a metadata.execution field to tasks:

interface TaskExecutionConfig {
  strategy: "subagent" | "workspace" | "rpc" | "manual";
  model?: string;
  cwd?: string;
  tools?: string[];
  skills?: string[];
}

TaskExecute reads this config to decide HOW to execute each task.

2. Fleet State in Task Store

Currently, agent state lives in the in-memory agentTaskMap and the fleet tracker. For persistence across orchestrator restarts, store fleet state in the task metadata:

store.update(taskId, {
  metadata: {
    agentId: "a3f7b2c1",
    execution: "workspace",
    workspaceRef: "workspace:7",
    surfaceRef: "surface:12",
    spawnedAt: Date.now(),
    lastActivity: "Reading auth.ts",
    lastChecked: Date.now(),
  },
});

On orchestrator restart (session_start), scan the task store for tasks with status: "in_progress" and metadata.workspaceRef. Verify the workspace still exists via cmux tree. If it does, resume monitoring. If not, mark the task as failed.

3. Shared Store Protocol

For multi-agent coordination, all agents need to use the same task store. The orchestrator sets up the store, then passes the store path to workers:

# Orchestrator sets the env
export PI_TASKS=/path/to/shared-tasks.json

# Workers inherit it (or it's passed via spawn-pi)
cmux new-workspace --command "PI_TASKS=/path/to/shared-tasks.json pi ..."

Workers can then use TaskList/TaskGet/TaskUpdate to claim tasks, report progress, and mark completion. The file-locked store handles concurrency.

Worker task claim protocol:

1. Worker calls TaskList → finds pending tasks
2. Worker calls TaskUpdate { taskId, status: "in_progress", owner: agentId }
3. If store.update succeeds and owner is set → task is claimed
4. Worker executes the task
5. Worker calls TaskUpdate { taskId, status: "completed" }

Race condition prevention: the file lock ensures only one writer at a time. First worker to lock and update the owner field wins the claim.

4. Enhanced Widget for Fleet View

The existing pi-tasks widget shows task status. Extend it with fleet context:

// Current: ◼ Update auth middleware
// Enhanced: ◼ Update auth middleware (ws:2 · sonnet · 2m 3s)

// Current: ✳ Running tests… (4m 1s · ↑ 4.1k ↓ 1.2k)
// Enhanced: ✳ Running tests… (ws:3 · sonnet · 4m 1s · ↑ 4.1k ↓ 1.2k)

When a task has metadata.workspaceRef, show the workspace ref and model.

Implementation Plan

Phase 1: Fork pi-tasks, add cmux spawning

  1. Fork @tintinweb/pi-tasks into joelhooks/pi-tools
  2. Add metadata.execution support to TaskExecute
  3. Implement spawnCmuxAgent() using the spawn-pi pattern
  4. Keep EventBus subagent support as fallback

Phase 2: Fleet dashboard integration

  1. Build fleet tracker that reads from task store
  2. Implement ctx.ui.setWidget("fleet", ...) dashboard
  3. Add cmux sidebar status sync for cross-workspace visibility
  4. Integrate read-screen polling for live activity detection

Phase 3: RPC worker pool

  1. Implement RpcWorkerPool with JSON-over-stdin control
  2. Add rpc execution strategy to TaskExecute
  3. Route high-throughput tasks (lint, format, simple fixes) to RPC pool
  4. Aggregate RPC results into task store

Phase 4: Cross-session coordination

  1. Workers use shared task store for claim/complete protocol
  2. Orchestrator monitors store for completion events
  3. Auto-cascade flows through the DAG across processes
  4. Add Unix socket relay for sub-second coordination (if needed)

What to Fork vs. Use Directly

Component Action Reason
TaskStore Use directly File locking, CRUD, deps — all solid
ProcessTracker Use directly Output buffering, wait, stop — works for RPC pool
TaskWidget Fork/extend Add workspace refs, model info, fleet context
TaskExecute Fork/extend Add cmux and RPC execution strategies
Index (event wiring) Fork Different lifecycle hooks for cmux integration
tasks-config.ts Extend Add execution strategy defaults
Subagent EventBus RPC Keep as option Still useful for in-process subagents

Patterns Stolen from the Wild

What we found digging through dmux, pi-coordination, pi-boomerang, jido_symphony, pi-verbosity-control, executor, and nicobailon's catalog.

1. LLM-Based Pane State Detection (dmux)

dmux's PaneAnalyzer is the pattern we need for fleet monitoring. Instead of fragile regex parsing of terminal output, it captures the last 50 lines of a pane and sends them to a cheap LLM (Gemini Flash / GPT-4o-mini via OpenRouter) to classify the state:

type PaneState = 'option_dialog' | 'open_prompt' | 'in_progress';

Key details:

  • Parallel model fallback — races Gemini Flash, Grok, GPT-4o-mini via Promise.any(). First success wins. Typical latency <1s.
  • Content-hash cache — MD5 of captured content → cached result (5s TTL). Identical screen = skip the API call entirely.
  • Request deduplication — if an analysis is already in-flight for a pane, concurrent requests share the same Promise.
  • Two-stage analysis — Stage 1 determines state (20 tokens). Stage 2 extracts details only if needed: options for dialogs (360 tokens), summary for idle agents (180 tokens).
  • Autopilot — when an option dialog is detected and the first option has no risk (potentialHarm.hasRisk === false), auto-sends the accept key.

What we steal: The fleet dashboard should use this pattern instead of regex-based screen parsing. When a worker goes idle, the analyzer extracts a summary for the dashboard and notification. When it detects a dialog, the orchestrator can auto-accept or escalate to the user.

2. Smart Attention Service (dmux)

dmux's DmuxAttentionService has a much better attention model than our simple mark-read/mark-unread:

  • Armed state — panes only generate notifications after at least one working status is observed. Prevents startup noise.
  • Fingerprinting — each attention event gets a fingerprint (${status}:${title}:${body}). Same fingerprint = don't re-notify.
  • Focus awareness — three surfaces: fully-focused (skip notification), same-window (flash attention), different-window (native notification).
  • Baseline suppression — first idle state after pane creation is stored as baseline, not notified. Only changes from baseline trigger attention.

What we steal: Replace our mark-unread/mark-read toggle with a proper attention service that tracks per-agent armed/baseline/fingerprint state.

3. Full Multi-Agent Coordination (pi-coordination)

nicobailon's pi-coordination is a complete orchestration system already built for pi. Key architectural ideas:

Task Queue Model

interface Task {
  id: string;           // TASK-XX or TASK-XX.Y (subtasks)
  priority: number;     // P0-P3
  status: TaskStatus;   // pending → claimed → complete/failed
  files?: string[];     // file reservations
  dependsOn?: string[];
  claimedBy?: string;   // worker ID
}

Phase Pipeline

[validate] → [coordinator] → [workers] → [integration] → [review] → [fixes] → [complete]

Each phase is a distinct stage. The coordinator spawns workers from the task queue, priority-aware. Workers run in parallel with dependency resolution.

Agent Roles

  • Coordinator (opus) — manages workflow, spawns workers
  • Workers (sonnet) — execute tasks, one per agent
  • Reviewer (opus) — code review after all workers complete
  • Scout — codebase analysis for planning
  • Planner — creates task specs from prose

Worker Self-Review

Workers run a "fresh eyes" review pass before marking complete. Up to 5 cycles. Only proceeds when "No issues found." is returned.

Smart Auto-Continue

When a worker fails, the system loads the previous attempt's context and builds a continuation prompt: "Don't redo X, fix Y at line Z."

Supervisor Loop

Monitors worker inactivity:

  • Nudge at 3 min → "wrap it up"
  • Restart at 5 min → kill + respawn with context
  • Abandon after 2 restarts → mark task failed

A2A Messaging

Workers communicate via typed messages:

  • agent_chat — send/receive/broadcast/escalate
  • agent_sync — contract synchronization (provide/need interfaces)
  • agent_work — lifecycle (complete, step, add task, deviation)
  • file_reservations — acquire/release/check file locks

SDK vs Subprocess Workers

Two modes: subprocess (separate pi process, default) or SDK (in-process via createAgentSession()). SDK mode enables steering mid-flight but shares process (crash = everything crashes).

What we steal:

  • The task queue + dependency resolution model is more mature than pi-tasks
  • File reservations prevent multi-worker conflicts
  • Supervisor loop pattern for stuck agent recovery
  • The phase pipeline concept for structuring orchestration
  • A2A messaging as an alternative to cmux send/read-screen
  • Worker context persistence for crash recovery

4. Boomerang: Execute-and-Collapse (pi-boomerang)

The /boomerang pattern: run a task autonomously, then collapse the entire exchange into a brief summary using navigateTree. The summary preserves WHAT was done without the step-by-step details.

Key mechanism:

// Before boomerang
pi.on("before_agent_start", async (event, ctx) => {
  // Inject "BOOMERANG MODE ACTIVE" into system prompt
  // Agent works fully autonomous (no clarifying questions)
});

// After boomerang
pi.on("agent_end", async (_event, ctx) => {
  // Collapse via navigateTree back to anchor point
  await commandCtx.navigateTree(targetId, { summarize: true });
});

// Custom summary via session_before_tree
pi.on("session_before_tree", async (event) => {
  // Generate structured summary from entries
  // Files read, files written, commands run, outcome
  return { summary: { summary: generatedSummary } };
});

Template chaining:

/boomerang /lint-fix -> /test-fix -> /commit -- "auth module"

Each step can specify a different model and skill via frontmatter.

What we steal:

  • Workers should use the boomerang pattern — execute task, collapse context
  • Template chaining for multi-step worker pipelines
  • The session_before_tree hook for custom collapse summaries
  • Model/skill switching per phase

5. Orchestrator GenServer Pattern (jido_symphony)

Elixir OTP patterns translated to our domain:

  • Polling loop with backoff — orchestrator polls issue tracker periodically, dispatches to available slots
  • Process monitoringProcess.monitor(pid) equivalent: track spawned workers, handle crashes via :DOWN messages
  • Token accounting — cumulative delta tracking across sessions with last-reported watermarks
  • Stall detection — if no codex activity for N ms, restart with backoff
  • Rate limit handling — extract and track rate limits from API responses
  • Issue lifecycle — claim → dispatch → monitor → complete/retry with exponential backoff and continuation retries

What we steal:

  • The polling + dispatch + monitor loop is exactly what our fleet manager needs
  • Token delta tracking (not absolute) for accurate cost across retries
  • Stall detection via last activity timestamp
  • Continuation retries (different from failure retries)

6. Provider Request Mutation (pi-verbosity-control)

The before_provider_request hook can mutate the API payload before it's sent. This is useful for:

  • Injecting worker-specific parameters
  • Overriding model behavior for specific agents
  • Adding metadata for observability

7. Architectural Synthesis: What the cmux Rig Should Look Like

Combining patterns from all repos, the ideal architecture is:

┌─ cmux Control Plane ─────────────────────────────────────────────┐
│                                                                   │
│  ┌─ Orchestrator (pi, opus) ──────────────────────────────────┐  │
│  │                                                             │  │
│  │  pi-coordination-style phase pipeline:                      │  │
│  │  [plan] → [spawn] → [monitor] → [review] → [complete]      │  │
│  │                                                             │  │
│  │  pi-tasks task queue with P0-P3 priorities + deps           │  │
│  │                                                             │  │
│  │  Fleet manager (jido_symphony polling pattern):             │  │
│  │    - spawn workers into cmux workspaces                     │  │
│  │    - monitor via dmux-style PaneAnalyzer                    │  │
│  │    - supervisor: nudge → restart → abandon                  │  │
│  │    - smart attention (armed/fingerprint/focus-aware)         │  │
│  │                                                             │  │
│  │  Dashboard widget showing all agents + task progress         │  │
│  │                                                             │  │
│  └─────────────────────────────────────────────────────────────┘  │
│                                                                   │
│  ┌─ Workers (cmux workspaces, visible) ──────────────────────┐   │
│  │                                                            │   │
│  │  Each worker loads pi-cmux (standalone package):            │   │
│  │    - sidebar status + notifications (visibility)            │   │
│  │    - tool activity reporting                                │   │
│  │    - mark-read/unread attention cycle                       │   │
│  │                                                            │   │
│  │  Boomerang mode for token efficiency:                       │   │
│  │    - execute task → collapse context → report summary       │   │
│  │                                                            │   │
│  │  A2A via shared task store (pi-tasks file locking):         │   │
│  │    - claim tasks, report progress, mark complete            │   │
│  │    - file reservations for conflict prevention              │   │
│  │                                                            │   │
│  │  Self-review before completion (pi-coordination pattern)    │   │
│  │                                                            │   │
│  └────────────────────────────────────────────────────────────┘   │
│                                                                   │
│  ┌─ RPC Pool (headless, high-throughput) ────────────────────┐   │
│  │  pi --mode rpc workers for lint, format, simple fixes      │   │
│  │  JSON-over-stdin control, no workspace overhead             │   │
│  └────────────────────────────────────────────────────────────┘   │
└───────────────────────────────────────────────────────────────────┘

The key insight from pi-coordination: the orchestrator itself is an agent with tools, not a script. It has spawn_workers, check_status, broadcast, escalate_to_user as LLM-callable tools. The LLM decides when to spawn more workers, when to intervene, when to escalate.

The key insight from dmux: status detection should be LLM-based, not regex. Screen content → cheap model → structured state. This handles the infinite variety of terminal output without brittle pattern matching.

The key insight from boomerang: token efficiency is critical for workers. Execute-and-collapse keeps context windows manageable across many tasks.

The key insight from jido_symphony: the polling/dispatch/monitor loop is battle-tested with proper retry backoff, stall detection, and rate limiting. Don't reinvent — translate to TypeScript.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment