Skip to content

Instantly share code, notes, and snippets.

@possibilities
Last active March 21, 2026 18:07
Show Gist options
  • Select an option

  • Save possibilities/ef7fb72f2f4803d262eabdfcca6090ad to your computer and use it in GitHub Desktop.

Select an option

Save possibilities/ef7fb72f2f4803d262eabdfcca6090ad to your computer and use it in GitHub Desktop.
Maestroctl Synthesis Engine Architecture

Maestroctl Synthesis Engine Architecture

Overview

maestroctl is an async daemon that maintains per-session state for Claude Code agent sessions. Events from multiple sources flow through a pure reducer per session, producing synthetic effects from state transitions.

It is consume-only — it ingests events from three sources but does not publish outbound events. Effects are logged internally. Adding a ZMQ PUB socket for outbound events is a one-line change when a consumer exists.

Data Flow

                         ┌──────────────────────────────────────────┐
                         │           Event Router                   │
                         │                                          │
watchctl subscription ──>│  SessionFileEvent ──> route by path      │
  (agent-session files)  │                      + session_id        │
                         │                                          │──> reduce(state, event)
watchfiles tasks ──────->│  PlanctlFileEvent ──> route by cwd       │       │
  (.planctl/ dirs)       │                      (fan-out to all     │       v
                         │                       matching sessions) │    derive(old, new)
ZMQ ingest socket ──────>│  HookEvent ────────> route by            │       │
  (claudehooks hooks)    │                      session_id          │       v
                         │                                          │    [effects -> log.info()]
                         └──────────────────────────────────────────┘
                                        │
                              cold_start() on boot
                              (DB + filesystem scan)

State Management

All state is in-memory. The EventRouter._sessions dict holds SessionState dataclasses — pure Python objects, not persisted to disk.

On restart, state is rebuilt from existing sources of truth:

  1. Agent-session JSON files on disk (session lifecycle, pid, cwd)
  2. claudehooks SQLite DB at ~/.local/state/claude/hook-events.db (mode, tool count, transitions)
  3. .planctl/state/tasks/*.state.json files (task columns)

These are the durable stores. The reducer state is a derived, in-memory view of them.

Modules

events.py — Event Dataclasses

Three frozen (immutable) event types:

Event Source Key Fields
SessionFileEvent watchctl subscription path, action, session_data (includes session_id, pid, cwd, status)
PlanctlFileEvent watchfiles watcher cwd, rel_path, action, semantic (e.g., "task foo state modified")
HookEvent ZMQ ingest socket session_id, hook_event, event_type, permission_mode, tool_name, data

Also exports parse_planctl_event() which converts .planctl/ relative paths into semantic descriptions.

state.py — Reducer + Derive

SessionState (frozen dataclass):

session_id, path, cwd, pid, alive
mode (plan|act), mode_transitions, tool_count, last_tool
tasks: dict[task_id -> column]

reduce(state, event) -> state: Pure function with isinstance dispatch per event type. Uses dataclasses.replace() to produce new immutable instances.

Current reducer behavior per event type:

  • SessionFileEvent — Updates cwd, pid, alive from agent-state JSON. Sets alive=False on delete or status=="exited".
  • HookEvent — Maps permission_mode to mode ("plan" or "act"). Increments mode_transitions on mode changes. On tool_use events, increments tool_count and sets last_tool.
  • PlanctlFileEvent — Parses semantic description. On "task X state deleted", removes task. On "task X lock claimed", sets column to in_progress. Other planctl events pass through without state changes. Actual task columns (todo/done/blocked) are populated by cold start reading .state.json files directly.

Each isinstance branch is independent — they don't interact. Adding a new data source means adding a new branch.

derive(old, new) -> list[Effect]: Compares old and new state, emits effects:

  • session_started / session_exited — lifecycle
  • mode_changed — plan/act transitions with count
  • task_moved / task_appeared / task_removed — kanban movements

Effects are currently logged. Adding ZMQ publishing is one line.

router.py — EventRouter

Maintains three indexes:

  • _sessions: dict[session_id, SessionState]
  • _cwd_to_sessions: dict[cwd, set[session_id]]
  • _path_to_session: dict[file_path, session_id]

Routing rules:

  • SessionFileEvent → route by file path (creates sessions on first sight)
  • HookEvent → route by session_id (direct match), fallback to cwd
  • PlanctlFileEvent → route by cwd, fan out to ALL matching sessions

A single event can belong to multiple sessions. PlanctlFileEvents fan out to every session whose cwd matches, since .planctl/ is shared state for a project directory.

Also handles: stale session detection (PID liveness), cwd index maintenance on session cwd changes.

run_orchestrate_claude.py — Async Daemon

Four concurrent tasks via asyncio.gather():

  1. _watchctl_sessions_sub — subscribes to watchctl.file.agent-sessions PUB socket
  2. _planctl_watcher — manages per-cwd watchfiles.awatch() tasks (refcounted, sentinel pattern)
  3. _ingest_serverzmq_serve() REP socket at ~/.local/share/maestroctl/ingest.sock
  4. _heartbeat — 60s PID liveness check + status log

Cold start (before gather): scan agent-session files, query claudehooks DB, read .planctl/ state.

Runs as a macOS LaunchAgent (arthack.maestroctl.orchestrate-claude) with KeepAlive: true. Managed by processctl.

External Integration

claudehooks → maestroctl

The claudehooks hook (hook-events-db.py) pushes every hook event to maestroctl's ingest socket via ZMQ REQ:

  • Checks ~/.local/share/maestroctl/ingest.sock exists (skips silently if not)
  • Sends {"action": "hook-event", ...row_dict}, waits for ACK
  • 500ms timeout, never blocks the hook pipeline
  • When maestroctl is offline: the socket file doesn't exist, so the hook returns immediately with zero overhead. Events are still written to the SQLite DB. On next maestroctl start, cold start rebuilds all state from the DB — nothing is lost.

watchctl → maestroctl

watchctl publishes file change events. maestroctl subscribes to watchctl.file.agent-sessions for session file changes. Config in ~/.config/watchctl/config.yaml.

.planctl/ → maestroctl

Watched in-process via watchfiles.awatch(). Dynamic — watches added/removed as sessions appear/disappear. Sentinel pattern: if .planctl/ doesn't exist yet, watches the cwd for its creation.

How to Extend

Adding a new data source:

  1. Add a new frozen dataclass to events.py
  2. Add the type to the Event union
  3. Add an isinstance branch to reduce() in state.py
  4. Add routing logic to router.py
  5. Add an async task to the gather() in run_orchestrate_claude.py
  6. Add fields to SessionState as needed

Adding new derived effects:

  1. Add comparison logic to derive() in state.py
  2. Effects are Effect(kind, session_id, data) — currently logged, publishable later

Splitting the reducer (when it gets large): The reducer is currently a single function with isinstance checks. When it grows unwieldy, split into per-source functions (_reduce_session_file(), _reduce_hook(), _reduce_planctl()) dispatched from the top-level reduce(). The pure function signature stays the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment