maestroctl is an async daemon that maintains per-session state for Claude Code agent sessions. Events from multiple sources flow through a pure reducer per session, producing synthetic effects from state transitions.
It is consume-only — it ingests events from three sources but does not publish outbound events. Effects are logged internally. Adding a ZMQ PUB socket for outbound events is a one-line change when a consumer exists.
┌──────────────────────────────────────────┐
│ Event Router │
│ │
watchctl subscription ──>│ SessionFileEvent ──> route by path │
(agent-session files) │ + session_id │
│ │──> reduce(state, event)
watchfiles tasks ──────->│ PlanctlFileEvent ──> route by cwd │ │
(.planctl/ dirs) │ (fan-out to all │ v
│ matching sessions) │ derive(old, new)
ZMQ ingest socket ──────>│ HookEvent ────────> route by │ │
(claudehooks hooks) │ session_id │ v
│ │ [effects -> log.info()]
└──────────────────────────────────────────┘
│
cold_start() on boot
(DB + filesystem scan)
All state is in-memory. The EventRouter._sessions dict holds SessionState dataclasses — pure Python objects, not persisted to disk.
On restart, state is rebuilt from existing sources of truth:
- Agent-session JSON files on disk (session lifecycle, pid, cwd)
- claudehooks SQLite DB at
~/.local/state/claude/hook-events.db(mode, tool count, transitions) .planctl/state/tasks/*.state.jsonfiles (task columns)
These are the durable stores. The reducer state is a derived, in-memory view of them.
Three frozen (immutable) event types:
| Event | Source | Key Fields |
|---|---|---|
SessionFileEvent |
watchctl subscription | path, action, session_data (includes session_id, pid, cwd, status) |
PlanctlFileEvent |
watchfiles watcher | cwd, rel_path, action, semantic (e.g., "task foo state modified") |
HookEvent |
ZMQ ingest socket | session_id, hook_event, event_type, permission_mode, tool_name, data |
Also exports parse_planctl_event() which converts .planctl/ relative paths into semantic descriptions.
SessionState (frozen dataclass):
session_id, path, cwd, pid, alive
mode (plan|act), mode_transitions, tool_count, last_tool
tasks: dict[task_id -> column]
reduce(state, event) -> state: Pure function with isinstance dispatch per event type. Uses dataclasses.replace() to produce new immutable instances.
Current reducer behavior per event type:
- SessionFileEvent — Updates
cwd,pid,alivefrom agent-state JSON. Setsalive=Falseon delete orstatus=="exited". - HookEvent — Maps
permission_modetomode("plan" or "act"). Incrementsmode_transitionson mode changes. Ontool_useevents, incrementstool_countand setslast_tool. - PlanctlFileEvent — Parses semantic description. On "task X state deleted", removes task. On "task X lock claimed", sets column to
in_progress. Other planctl events pass through without state changes. Actual task columns (todo/done/blocked) are populated by cold start reading.state.jsonfiles directly.
Each isinstance branch is independent — they don't interact. Adding a new data source means adding a new branch.
derive(old, new) -> list[Effect]: Compares old and new state, emits effects:
session_started/session_exited— lifecyclemode_changed— plan/act transitions with counttask_moved/task_appeared/task_removed— kanban movements
Effects are currently logged. Adding ZMQ publishing is one line.
Maintains three indexes:
_sessions: dict[session_id, SessionState]_cwd_to_sessions: dict[cwd, set[session_id]]_path_to_session: dict[file_path, session_id]
Routing rules:
SessionFileEvent→ route by file path (creates sessions on first sight)HookEvent→ route bysession_id(direct match), fallback to cwdPlanctlFileEvent→ route by cwd, fan out to ALL matching sessions
A single event can belong to multiple sessions. PlanctlFileEvents fan out to every session whose cwd matches, since .planctl/ is shared state for a project directory.
Also handles: stale session detection (PID liveness), cwd index maintenance on session cwd changes.
Four concurrent tasks via asyncio.gather():
_watchctl_sessions_sub— subscribes towatchctl.file.agent-sessionsPUB socket_planctl_watcher— manages per-cwdwatchfiles.awatch()tasks (refcounted, sentinel pattern)_ingest_server—zmq_serve()REP socket at~/.local/share/maestroctl/ingest.sock_heartbeat— 60s PID liveness check + status log
Cold start (before gather): scan agent-session files, query claudehooks DB, read .planctl/ state.
Runs as a macOS LaunchAgent (arthack.maestroctl.orchestrate-claude) with KeepAlive: true. Managed by processctl.
The claudehooks hook (hook-events-db.py) pushes every hook event to maestroctl's ingest socket via ZMQ REQ:
- Checks
~/.local/share/maestroctl/ingest.sockexists (skips silently if not) - Sends
{"action": "hook-event", ...row_dict}, waits for ACK - 500ms timeout, never blocks the hook pipeline
- When maestroctl is offline: the socket file doesn't exist, so the hook returns immediately with zero overhead. Events are still written to the SQLite DB. On next maestroctl start, cold start rebuilds all state from the DB — nothing is lost.
watchctl publishes file change events. maestroctl subscribes to watchctl.file.agent-sessions for session file changes. Config in ~/.config/watchctl/config.yaml.
Watched in-process via watchfiles.awatch(). Dynamic — watches added/removed as sessions appear/disappear. Sentinel pattern: if .planctl/ doesn't exist yet, watches the cwd for its creation.
Adding a new data source:
- Add a new frozen dataclass to
events.py - Add the type to the
Eventunion - Add an
isinstancebranch toreduce()instate.py - Add routing logic to
router.py - Add an async task to the
gather()inrun_orchestrate_claude.py - Add fields to
SessionStateas needed
Adding new derived effects:
- Add comparison logic to
derive()instate.py - Effects are
Effect(kind, session_id, data)— currently logged, publishable later
Splitting the reducer (when it gets large):
The reducer is currently a single function with isinstance checks. When it grows unwieldy, split into per-source functions (_reduce_session_file(), _reduce_hook(), _reduce_planctl()) dispatched from the top-level reduce(). The pure function signature stays the same.