Created
March 18, 2026 01:14
-
-
Save elyase/97459151e6c7f7b45ff436073fe70c1f to your computer and use it in GitHub Desktop.
Multi-Agent Workflow extension for pi — sequence, parallel, loop with Claude, Gemini, Codex, ChatGPT Pro per task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Agent execution — all models via CLI, single code path. | |
| * | |
| * Provider table + one runProcess(). Mirrors Smithers' agent abstraction | |
| * but uses CLI tools (claude, gemini, codex, ask_gpt54_pro.py) instead of AI SDK. | |
| */ | |
| import type { z } from "zod"; | |
| import type { AgentConfig, TaskNode, WorkflowContext } from "./types"; | |
| import { spawn } from "node:child_process"; | |
| import { homedir } from "node:os"; | |
| import { join } from "node:path"; | |
| import { zodToJsonSchema } from "zod-to-json-schema"; | |
| // ── Provider Table ── | |
| export type ModelId = "claude" | "gemini" | "codex" | "chatgpt-pro"; | |
| type ProviderConfig = { | |
| label: string; | |
| timeoutMs: number; | |
| stdin: boolean; | |
| command: (prompt: string) => [cmd: string, args: string[]]; | |
| }; | |
| const SCRIPT_PATH = join(homedir(), ".agents", "skills", "deliberation", "scripts", "ask_gpt54_pro.py"); | |
| const PROVIDERS: Record<ModelId, ProviderConfig> = { | |
| claude: { | |
| label: "Claude Opus 4.6", | |
| timeoutMs: 300_000, | |
| stdin: true, | |
| command: () => ["claude", ["--print", "--output-format", "text", "--dangerously-skip-permissions"]], | |
| }, | |
| gemini: { | |
| label: "Gemini 3.1 Pro", | |
| timeoutMs: 120_000, | |
| stdin: false, | |
| command: (p) => ["gemini", ["-m", "gemini-3.1-pro-preview", p, "-o", "text"]], | |
| }, | |
| codex: { | |
| label: "Codex GPT-5.4", | |
| timeoutMs: 600_000, | |
| stdin: false, | |
| command: (p) => ["codex", ["exec", "--skip-git-repo-check", "-m", "gpt-5.4", "-c", 'model_reasoning_effort="high"', "--full-auto", p]], | |
| }, | |
| "chatgpt-pro": { | |
| label: "ChatGPT Pro GPT-5.4", | |
| timeoutMs: 3_600_000, | |
| stdin: false, | |
| command: (p) => ["uv", ["run", SCRIPT_PATH, p]], | |
| }, | |
| }; | |
| const ALIASES: Record<string, ModelId> = { | |
| claude: "claude", "claude-opus": "claude", "claude-opus-4.6": "claude", | |
| gemini: "gemini", "gemini-3.1-pro": "gemini", | |
| codex: "codex", "codex-gpt-5.4": "codex", "gpt-5.4": "codex", | |
| "chatgpt-pro": "chatgpt-pro", "gpt-5.4-pro": "chatgpt-pro", chatgpt: "chatgpt-pro", | |
| }; | |
| export function resolveModel(model: string): ModelId { | |
| const id = ALIASES[model.toLowerCase().trim()]; | |
| if (!id) throw new Error(`Unknown model "${model}". Available: ${Object.keys(PROVIDERS).join(", ")}`); | |
| return id; | |
| } | |
| export function modelLabel(model: string): string { | |
| return PROVIDERS[resolveModel(model)].label; | |
| } | |
| // ── Task Execution ── | |
| export async function executeTask( | |
| task: TaskNode, | |
| ctx: WorkflowContext, | |
| cwd: string, | |
| signal?: AbortSignal, | |
| onProgress?: (msg: string) => void, | |
| ): Promise<any> { | |
| if (task.value) return task.output.parse(task.value(ctx)); | |
| if (!task.agent || !task.prompt) throw new Error(`Task "${task.id}": needs agent+prompt or value`); | |
| const prompt = task.prompt(ctx); | |
| const agents = [task.agent, ...(task.fallbackAgents ?? [])]; | |
| const maxRetries = task.retries ?? 2; | |
| let lastError = ""; | |
| for (const agent of agents) { | |
| const modelId = resolveModel(agent.model); | |
| const provider = PROVIDERS[modelId]; | |
| for (let attempt = 0; attempt <= maxRetries; attempt++) { | |
| try { | |
| if (signal?.aborted) throw new Error("Aborted"); | |
| onProgress?.(`"${task.id}" → ${provider.label} (attempt ${attempt + 1}/${maxRetries + 1})`); | |
| // Build prompt — append last validation error on retries (Smithers pattern) | |
| const retryContext = lastError ? `\n\n── PREVIOUS ATTEMPT FAILED ──\n${lastError}\nFix the issue and try again.` : ""; | |
| const fullPrompt = buildPrompt(prompt + retryContext, task.output, agent); | |
| const [cmd, args] = provider.command(fullPrompt); | |
| const output = await runProcess(cmd, args, provider.stdin ? fullPrompt : undefined, cwd, provider.timeoutMs, signal); | |
| const json = extractJson(output); | |
| lastError = ""; | |
| return task.output.parse(JSON.parse(json)); | |
| } catch (err: any) { | |
| lastError = err.message; | |
| const isLast = attempt === maxRetries; | |
| if (isLast) { | |
| onProgress?.(`${agent.model} failed for "${task.id}"${agents.indexOf(agent) < agents.length - 1 ? ", trying fallback..." : ""}`); | |
| break; | |
| } | |
| onProgress?.(`"${task.id}" attempt ${attempt + 1} failed: ${err.message.slice(0, 100)}. Retrying...`); | |
| } | |
| } | |
| } | |
| throw new Error(`Task "${task.id}" failed: ${lastError}`); | |
| } | |
| // ── Prompt Builder ── | |
| function buildPrompt(prompt: string, schema: z.ZodType, agent: AgentConfig): string { | |
| const jsonSchema = JSON.stringify(zodToJsonSchema(schema, "output"), null, 2); | |
| return [ | |
| agent.instructions ?? "", | |
| prompt, | |
| "─── OUTPUT FORMAT ───", | |
| "Respond with ONLY a JSON object matching this schema:", | |
| jsonSchema, | |
| "Output ONLY valid JSON. No markdown fences, no preamble, no trailing text.", | |
| ].filter(Boolean).join("\n\n"); | |
| } | |
| // ── Process Runner (unified) ── | |
| function runProcess( | |
| cmd: string, args: string[], stdin: string | undefined, | |
| cwd: string, timeoutMs: number, signal?: AbortSignal, | |
| ): Promise<string> { | |
| return new Promise((resolve, reject) => { | |
| let stdout = "", stderr = ""; | |
| const proc = spawn(cmd, args, { cwd, timeout: timeoutMs, env: process.env, stdio: ["pipe", "pipe", "pipe"] }); | |
| if (stdin) { proc.stdin.write(stdin); proc.stdin.end(); } | |
| proc.stdout.on("data", (d: Buffer) => { stdout += d; }); | |
| proc.stderr.on("data", (d: Buffer) => { stderr += d; }); | |
| const onAbort = () => { proc.kill("SIGTERM"); reject(new Error("Aborted")); }; | |
| signal?.addEventListener("abort", onAbort, { once: true }); | |
| proc.on("close", (code) => { | |
| signal?.removeEventListener("abort", onAbort); | |
| stdout.trim() || code === 0 ? resolve(stdout) : reject(new Error(`${cmd} exit ${code}: ${stderr.slice(0, 1000)}`)); | |
| }); | |
| proc.on("error", (e) => { signal?.removeEventListener("abort", onAbort); reject(e); }); | |
| }); | |
| } | |
| // ── JSON Extraction ── | |
| function extractJson(text: string): string { | |
| const clean = text.replace(/\x1b\[[0-9;]*m/g, ""); | |
| const fenced = clean.match(/```(?:json)?\s*\n?([\s\S]*?)\n?```/); | |
| if (fenced) return fenced[1].trim(); | |
| const start = clean.indexOf("{"), end = clean.lastIndexOf("}"); | |
| if (start !== -1 && end > start) return clean.slice(start, end + 1); | |
| return clean.trim(); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Code Review Loop — Claude reviews, Codex fixes, loops until approved. | |
| * build(ctx) re-renders after each step — just like Smithers' JSX re-render. | |
| */ | |
| import { defineWorkflow, task, sequence, loop } from "../extensions/smithers/types"; | |
| import { z } from "zod"; | |
| const reviewSchema = z.object({ approved: z.boolean(), feedback: z.string(), issues: z.array(z.string()), severity: z.enum(["low", "medium", "high"]) }); | |
| const fixSchema = z.object({ filesChanged: z.array(z.string()), changesSummary: z.string() }); | |
| const summarySchema = z.object({ report: z.string(), totalIterations: z.number(), approved: z.boolean() }); | |
| export default defineWorkflow({ | |
| name: "code-review-loop", | |
| description: "Claude reviews → Codex fixes → loop until LGTM", | |
| input: z.object({ directory: z.string(), focus: z.string() }), | |
| build: (ctx) => { | |
| const prevReview = ctx.outputMaybe("review"); | |
| const prevFix = ctx.outputMaybe("fix"); | |
| return sequence([ | |
| loop( | |
| { until: () => ctx.outputMaybe("review")?.approved === true, maxIterations: 3, onMaxReached: "return-last" }, | |
| sequence([ | |
| task("review", { | |
| agent: { model: "claude", instructions: "Senior code reviewer. Read code with your tools. Set approved=true if clean, else list issues." }, | |
| output: reviewSchema, | |
| prompt: () => [ | |
| `Review code in: ${ctx.input.directory}. Focus: ${ctx.input.focus}`, | |
| prevReview ? `Previous review: ${prevReview.issues.length} issues (${prevReview.severity}).` : "Initial review.", | |
| prevFix ? `Fixes applied: ${prevFix.changesSummary}` : "", | |
| "Use tools to read files. Check for bugs, error handling, type safety.", | |
| ].filter(Boolean).join("\n"), | |
| }), | |
| // Conditional — no BranchNode needed, just JS! | |
| ...(ctx.outputMaybe("review")?.approved ? [] : [ | |
| task("fix", { | |
| agent: { model: "codex", instructions: "Senior engineer. Fix review issues. Minimal changes. Run tests." }, | |
| output: fixSchema, | |
| prompt: () => { | |
| const r = ctx.output("review"); | |
| return `Fix these issues:\n${r.issues.map((i: string, n: number) => `${n + 1}. ${i}`).join("\n")}\n\nDirectory: ${ctx.input.directory}`; | |
| }, | |
| }), | |
| ]), | |
| ]), | |
| ), | |
| task("summary", { | |
| output: summarySchema, | |
| value: () => ({ | |
| report: prevReview?.approved ? "LGTM!" : `Completed after ${ctx.iterations("review")} iterations. Last: ${prevReview?.feedback ?? "none"}`, | |
| totalIterations: ctx.iterations("review"), | |
| approved: prevReview?.approved ?? false, | |
| }), | |
| }), | |
| ]); | |
| }, | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * pi-smithers — Durable multi-agent workflow orchestration for pi. | |
| * | |
| * Registers `workflow` tool + `/workflow` command. | |
| * Workflow definitions live in .pi/workflows/*.ts (project) or ~/.pi/workflows/*.ts (global). | |
| */ | |
| import type { ExtensionAPI, Theme } from "@mariozechner/pi-coding-agent"; | |
| import { Type } from "@sinclair/typebox"; | |
| import { StringEnum } from "@mariozechner/pi-ai"; | |
| import { Text } from "@mariozechner/pi-tui"; | |
| import { existsSync, readdirSync, readFileSync, writeFileSync, mkdirSync } from "node:fs"; | |
| import { join } from "node:path"; | |
| import { homedir } from "node:os"; | |
| import { runWorkflow, type RunResult, type ProgressEvent } from "./runtime"; | |
| import type { WorkflowDefinition, RunState } from "./types"; | |
| import { WorkflowPanel, createOverlay, applyEvent, type OverlayState } from "./ui"; | |
| // ── Workflow Loading ── | |
| const workflowDirs = (cwd: string) => [join(cwd, ".pi", "workflows"), join(homedir(), ".pi", "workflows")]; | |
| async function loadWorkflow(name: string, cwd: string): Promise<WorkflowDefinition | null> { | |
| for (const dir of workflowDirs(cwd)) { | |
| for (const ext of [".ts", ".js"]) { | |
| const p = join(dir, name + ext); | |
| if (existsSync(p)) { const m = await import(p); return m.default ?? m; } | |
| } | |
| } | |
| return null; | |
| } | |
| function listWorkflows(cwd: string) { | |
| const out: Array<{ name: string; path: string; scope: string }> = []; | |
| for (const [i, dir] of workflowDirs(cwd).entries()) { | |
| if (!existsSync(dir)) continue; | |
| for (const f of readdirSync(dir).filter((f) => /\.(ts|js)$/.test(f) && !f.startsWith("."))) | |
| out.push({ name: f.replace(/\.(ts|js)$/, ""), path: join(dir, f), scope: i === 0 ? "project" : "global" }); | |
| } | |
| return out; | |
| } | |
| // ── Persistence ── | |
| const runsPath = (cwd: string) => join(cwd, ".pi", "workflow-runs.json"); | |
| function saveRun(cwd: string, s: RunState) { | |
| const p = runsPath(cwd); | |
| mkdirSync(join(p, ".."), { recursive: true }); | |
| let runs: Record<string, RunState> = {}; | |
| try { runs = JSON.parse(readFileSync(p, "utf-8")); } catch {} | |
| runs[s.runId] = s; | |
| const keys = Object.keys(runs); | |
| if (keys.length > 20) for (const k of keys.slice(0, keys.length - 20)) delete runs[k]; | |
| writeFileSync(p, JSON.stringify(runs, null, 2)); | |
| } | |
| function loadRun(cwd: string, id: string): RunState | null { | |
| try { return JSON.parse(readFileSync(runsPath(cwd), "utf-8"))[id] ?? null; } catch { return null; } | |
| } | |
| function listRuns(cwd: string): RunState[] { | |
| try { return Object.values(JSON.parse(readFileSync(runsPath(cwd), "utf-8")) as Record<string, RunState>).sort((a, b) => b.startedAt - a.startedAt); } | |
| catch { return []; } | |
| } | |
| // ── Extension ── | |
| let overlayActive = false; | |
| export default function (pi: ExtensionAPI) { | |
| pi.registerTool({ | |
| name: "workflow", | |
| label: "Workflow", | |
| description: | |
| "Run durable multi-agent workflows defined in .pi/workflows/*.ts. " + | |
| "Supports sequence, parallel, loop. Each task can use a different model (Claude, Codex, Gemini, ChatGPT Pro). " + | |
| "Outputs are Zod-validated and persisted per-step for crash recovery.", | |
| promptSnippet: "Run multi-agent workflows (sequence, parallel, loop) with different models per step", | |
| promptGuidelines: [ | |
| "Use `workflow(action: 'list')` to discover workflows.", | |
| "Use `workflow(action: 'run', name: '...', input: {...})` to execute.", | |
| "Use `workflow(action: 'resume', runId: '...')` to resume a failed run.", | |
| "Workflows use build(ctx) — the tree re-renders after each task, so JS if/else works for branching.", | |
| ], | |
| parameters: Type.Object({ | |
| action: StringEnum(["run", "list", "status", "resume", "runs"] as const), | |
| name: Type.Optional(Type.String({ description: "Workflow name" })), | |
| input: Type.Optional(Type.Record(Type.String(), Type.Any())), | |
| runId: Type.Optional(Type.String({ description: "Run ID" })), | |
| }), | |
| async execute(_id, params, signal, onUpdate, ctx) { | |
| switch (params.action) { | |
| case "list": { | |
| const wfs = listWorkflows(ctx.cwd); | |
| return { content: [{ type: "text", text: wfs.length ? wfs.map((w) => `• ${w.name} (${w.scope})`).join("\n") : "No workflows found. Create .pi/workflows/<name>.ts" }], details: { workflows: wfs } }; | |
| } | |
| case "runs": { | |
| const runs = listRuns(ctx.cwd); | |
| return { content: [{ type: "text", text: runs.length ? runs.map((r) => `${r.status === "completed" ? "✓" : "✗"} ${r.runId} ${r.workflowName} (${r.status})`).join("\n") : "No runs." }], details: { runs } }; | |
| } | |
| case "status": { | |
| if (!params.runId) return { content: [{ type: "text", text: "runId required" }], isError: true }; | |
| const s = loadRun(ctx.cwd, params.runId); | |
| return s ? { content: [{ type: "text", text: JSON.stringify(s, null, 2) }], details: { state: s } } : { content: [{ type: "text", text: "Not found" }], isError: true }; | |
| } | |
| case "run": | |
| case "resume": { | |
| let wf: WorkflowDefinition | null = null; | |
| let resume: RunState | undefined; | |
| if (params.action === "resume") { | |
| if (!params.runId) return { content: [{ type: "text", text: "runId required" }], isError: true }; | |
| resume = loadRun(ctx.cwd, params.runId) ?? undefined; | |
| if (!resume) return { content: [{ type: "text", text: `Run ${params.runId} not found` }], isError: true }; | |
| wf = await loadWorkflow(resume.workflowName, ctx.cwd); | |
| } else { | |
| if (!params.name) return { content: [{ type: "text", text: "name required" }], isError: true }; | |
| wf = await loadWorkflow(params.name, ctx.cwd); | |
| } | |
| if (!wf) return { content: [{ type: "text", text: `Workflow not found` }], isError: true }; | |
| // Overlay | |
| const showOverlay = ctx.hasUI && !overlayActive; | |
| let tui: any = null, closeFn: (() => void) | null = null, overlayP: Promise<void> | null = null; | |
| let overlay: OverlayState | null = null; | |
| const runId = resume?.runId ?? Date.now().toString(36).slice(-4); | |
| if (showOverlay) { | |
| overlayActive = true; | |
| overlay = createOverlay(wf.name, runId); | |
| overlayP = ctx.ui.custom<void>((t, theme, _kb, done) => { | |
| tui = t; closeFn = () => done(); | |
| return new WorkflowPanel(overlay!, theme); | |
| }, { overlay: true, overlayOptions: { nonCapturing: true, anchor: "right-center", width: "45%", minWidth: 36, maxHeight: "80%", margin: { right: 1, top: 1, bottom: 1 }, visible: (w: number) => w >= 100 } }); | |
| } | |
| const onProgress = (e: ProgressEvent) => { | |
| if (overlay) { applyEvent(overlay, e); tui?.requestRender(); } | |
| if (e.type === "task_start" || e.type === "task_end" || e.type === "info") { | |
| const msg = e.type === "info" ? e.message : e.type === "task_start" ? `Starting "${e.taskId}" (${e.agent ?? "static"})...` : `"${e.taskId}" → ${e.result.status} (${e.result.durationMs}ms)`; | |
| onUpdate?.({ content: [{ type: "text", text: msg }], details: { event: e } }); | |
| } | |
| }; | |
| let result: RunResult; | |
| try { | |
| result = await runWorkflow({ | |
| workflow: wf, | |
| input: params.input ?? resume?.input ?? {}, | |
| cwd: ctx.cwd, | |
| signal: signal ?? undefined, | |
| resumeState: resume, | |
| onProgress, | |
| onApproval: ctx.hasUI ? (id, label) => ctx.ui.confirm("Approval Required", `Task "${label ?? id}" needs approval. Proceed?`) : undefined, | |
| onCommit: (s) => saveRun(ctx.cwd, s), // Per-step persistence! | |
| }); | |
| } catch (err: any) { | |
| if (showOverlay) { closeFn?.(); if (overlayP) await overlayP; overlayActive = false; } | |
| return { content: [{ type: "text", text: `Error: ${err.message}` }], isError: true }; | |
| } | |
| pi.appendEntry("workflow-run", { state: result.state }); | |
| if (showOverlay) { | |
| await new Promise((r) => setTimeout(r, 1000)); | |
| closeFn?.(); if (overlayP) await overlayP; overlayActive = false; | |
| } | |
| return { | |
| content: [{ type: "text", text: formatSummary(result) }], | |
| details: { runId: result.state.runId, status: result.state.status, results: result.state.results, outputs: result.outputs, durationMs: (result.state.completedAt ?? Date.now()) - result.state.startedAt }, | |
| }; | |
| } | |
| default: return { content: [{ type: "text", text: `Unknown action` }], isError: true }; | |
| } | |
| }, | |
| renderCall(args: any, theme: Theme) { | |
| let t = theme.fg("toolTitle", theme.bold("workflow ")) + theme.fg("accent", args.action ?? "?"); | |
| if (args.name) t += theme.fg("dim", ` ${args.name}`); | |
| if (args.runId) t += theme.fg("dim", ` run:${args.runId}`); | |
| return new Text(t, 0, 0); | |
| }, | |
| renderResult(result: any, { expanded, isPartial }: any, theme: Theme) { | |
| if (isPartial) return new Text(theme.fg("warning", "⟳ ") + theme.fg("dim", result.content?.[0]?.text ?? "..."), 0, 0); | |
| const d = result.details as any; | |
| if (!d?.runId) return new Text(result.content?.[0]?.text ?? "", 0, 0); | |
| const icon = d.status === "completed" ? theme.fg("success", "✓") : theme.fg("error", "✗"); | |
| const dur = d.durationMs ? theme.fg("dim", ` ${Math.round(d.durationMs / 1000)}s`) : ""; | |
| const header = `${icon} ${theme.fg("accent", "workflow")} ${theme.fg("text", d.runId)}${dur}${theme.fg("dim", ` (${d.results?.length ?? 0} tasks)`)}`; | |
| return new Text(expanded ? header + "\n" + (result.content?.[0]?.text ?? "") : header, 0, 0); | |
| }, | |
| }); | |
| pi.registerCommand("workflow", { | |
| description: "List and run workflows", | |
| handler: async (args, ctx) => { | |
| const wfs = listWorkflows(ctx.cwd); | |
| if (!wfs.length) { ctx.ui.notify("No workflows in .pi/workflows/", "warning"); return; } | |
| if (args?.trim()) { pi.sendUserMessage(`Run the "${args.trim()}" workflow.`, { deliverAs: "followUp" }); return; } | |
| const sel = await ctx.ui.select("Run Workflow", wfs.map((w) => ({ value: w.name, label: w.name, description: w.scope }))); | |
| if (sel) pi.sendUserMessage(`Run the "${sel}" workflow. Ask me for required input.`, { deliverAs: "followUp" }); | |
| }, | |
| }); | |
| } | |
| // ── Format ── | |
| function formatSummary({ state: s, outputs }: RunResult): string { | |
| const dur = s.completedAt ? `${Math.round((s.completedAt - s.startedAt) / 1000)}s` : "?"; | |
| const lines = [`${s.status === "completed" ? "✓" : "✗"} "${s.workflowName}" ${s.status} (${dur}) run: ${s.runId}`, ""]; | |
| for (const r of s.results) { | |
| const i = r.status === "success" ? "✓" : r.status === "skipped" ? "○" : "✗"; | |
| lines.push(` ${i} ${r.taskId}${r.agent ? ` (${r.agent})` : ""} — ${r.durationMs}ms${r.error ? ` — ${r.error}` : ""}`); | |
| } | |
| if (Object.keys(outputs).length) { | |
| lines.push("", "Outputs:"); | |
| for (const [k, v] of Object.entries(outputs)) { const j = JSON.stringify(v, null, 2); lines.push(` ${k}: ${j.length > 200 ? j.slice(0, 200) + "…" : j}`); } | |
| } | |
| if (s.error) lines.push("", `Error: ${s.error}`); | |
| return lines.join("\n"); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Multi-Agent Review — 1:1 reproduction of https://smithers.sh/examples/multi-agent-review | |
| * | |
| * Two reviewers run in parallel, then an aggregator produces a final verdict. | |
| * Uses build(ctx) — the tree re-renders after each task, just like Smithers' JSX. | |
| */ | |
| import { defineWorkflow, task, sequence, parallel } from "../extensions/smithers/types"; | |
| import { z } from "zod"; | |
| const reviewSchema = z.object({ approved: z.boolean(), feedback: z.string() }); | |
| const verdictSchema = z.object({ approved: z.boolean(), summary: z.string() }); | |
| export default defineWorkflow({ | |
| name: "multi-agent-review", | |
| description: "Two reviewers in parallel → aggregator verdict", | |
| input: z.object({ diff: z.string().describe("PR diff to review") }), | |
| build: (ctx) => sequence([ | |
| parallel([ | |
| task("security-review", { | |
| agent: { model: "claude", instructions: "You are a security-focused code reviewer. Look for vulnerabilities, injection risks, and auth issues." }, | |
| output: reviewSchema, | |
| prompt: () => `Review this PR diff for security issues:\n\n\`\`\`diff\n${ctx.input.diff}\n\`\`\``, | |
| }), | |
| task("quality-review", { | |
| agent: { model: "gemini", instructions: "You are a code quality reviewer. Evaluate readability, error handling, and best practices." }, | |
| output: reviewSchema, | |
| prompt: () => `Review this PR diff for code quality:\n\n\`\`\`diff\n${ctx.input.diff}\n\`\`\``, | |
| }), | |
| ], { maxConcurrency: 2 }), | |
| task("aggregate", { | |
| agent: { model: "claude", instructions: "Synthesize two code reviews into a single verdict. Approve only if both approve." }, | |
| output: verdictSchema, | |
| prompt: () => { | |
| const sec = ctx.output("security-review"); | |
| const qual = ctx.output("quality-review"); | |
| return `Combine these reviews:\n\nSecurity: ${sec.approved ? "APPROVED" : "REJECTED"} - ${sec.feedback}\n\nQuality: ${qual.approved ? "APPROVED" : "REJECTED"} - ${qual.feedback}`; | |
| }, | |
| }), | |
| ]), | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "name": "pi-smithers", | |
| "private": true, | |
| "dependencies": { | |
| "zod": "^3.24.0", | |
| "zod-to-json-schema": "^3.24.0" | |
| }, | |
| "pi": { | |
| "extensions": ["./index.ts"] | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Parallel Analysis → Branch → Action | |
| * Three models analyze in parallel, then branch based on severity. | |
| * Branching via JS if/else in build(ctx) — no BranchNode needed. | |
| */ | |
| import { defineWorkflow, task, sequence, parallel } from "../extensions/smithers/types"; | |
| import { z } from "zod"; | |
| const analysisSchema = z.object({ | |
| category: z.string(), | |
| issues: z.array(z.object({ file: z.string(), description: z.string(), severity: z.enum(["info", "warning", "error"]) })), | |
| score: z.number().min(0).max(100), | |
| summary: z.string(), | |
| }); | |
| const triageSchema = z.object({ overallSeverity: z.enum(["low", "medium", "high"]), totalIssues: z.number(), criticalIssues: z.number() }); | |
| const fixSchema = z.object({ filesFixed: z.array(z.string()), summary: z.string() }); | |
| const reportSchema = z.object({ markdown: z.string(), issueCount: z.number() }); | |
| export default defineWorkflow({ | |
| name: "parallel-analysis", | |
| description: "Parallel security/perf/quality → branch on severity", | |
| input: z.object({ directory: z.string() }), | |
| build: (ctx) => { | |
| const triage = ctx.outputMaybe("triage"); | |
| return sequence([ | |
| parallel([ | |
| task("security", { agent: { model: "claude", instructions: "Security auditor. Use tools to read code. Find vulnerabilities." }, output: analysisSchema, prompt: () => `Security audit of ${ctx.input.directory}.` }), | |
| task("performance", { agent: { model: "gemini", instructions: "Performance engineer. Find N+1 queries, memory leaks, slow algorithms." }, output: analysisSchema, prompt: () => `Performance analysis of ${ctx.input.directory}.` }), | |
| task("quality", { agent: { model: "codex", instructions: "Code quality reviewer. Use tools. Find dead code, missing types, smells." }, output: analysisSchema, prompt: () => `Quality review of ${ctx.input.directory}.` }), | |
| ]), | |
| task("triage", { | |
| output: triageSchema, | |
| value: () => { | |
| const [sec, perf, qual] = ["security", "performance", "quality"].map((id) => ctx.output(id)); | |
| const all = [...sec.issues, ...perf.issues, ...qual.issues]; | |
| const critical = all.filter((i: any) => i.severity === "error").length; | |
| const avg = (sec.score + perf.score + qual.score) / 3; | |
| return { overallSeverity: critical > 5 || avg < 50 ? "high" : critical > 0 || avg < 75 ? "medium" : "low", totalIssues: all.length, criticalIssues: critical }; | |
| }, | |
| }), | |
| // JS branching — tree shape changes based on triage result | |
| ...(triage?.overallSeverity === "low" | |
| ? [task("auto-fix", { agent: { model: "codex", instructions: "Fix minor issues. Minimal changes." }, output: fixSchema, prompt: () => `Fix: ${ctx.output("quality").issues.map((i: any) => `${i.file}: ${i.description}`).join("\n")}` })] | |
| : triage | |
| ? [task("report", { agent: { model: "claude", instructions: "Write detailed Markdown analysis report." }, output: reportSchema, prompt: () => { | |
| const [sec, perf, qual] = ["security", "performance", "quality"].map((id) => ctx.output(id)); | |
| return `Report: Security(${sec.score}), Perf(${perf.score}), Quality(${qual.score})\n${[...sec.issues, ...perf.issues, ...qual.issues].map((i: any) => `[${i.severity}] ${i.file}: ${i.description}`).join("\n")}`; | |
| } })] | |
| : []), | |
| ]); | |
| }, | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Research → Write → Edit pipeline. Three models, each playing to their strength. | |
| */ | |
| import { defineWorkflow, task, sequence } from "../extensions/smithers/types"; | |
| import { z } from "zod"; | |
| const researchSchema = z.object({ summary: z.string(), keyPoints: z.array(z.string()), sources: z.array(z.string()), outline: z.array(z.string()) }); | |
| const articleSchema = z.object({ title: z.string(), content: z.string(), wordCount: z.number() }); | |
| const editedSchema = z.object({ content: z.string(), wordCount: z.number(), changes: z.string() }); | |
| export default defineWorkflow({ | |
| name: "research-write", | |
| description: "Gemini researches → Claude writes → ChatGPT Pro edits", | |
| input: z.object({ topic: z.string(), targetLength: z.number().optional(), tone: z.string().optional() }), | |
| build: (ctx) => sequence([ | |
| task("research", { | |
| agent: { model: "gemini", instructions: "Research assistant. Comprehensive, factual, with sources." }, | |
| output: researchSchema, | |
| prompt: () => `Research: ${ctx.input.topic}\n\nProvide summary, 5-8 key points, sources, and suggested outline.`, | |
| }), | |
| task("write", { | |
| agent: { model: "claude", instructions: "Technical writer. Clear, engaging, well-structured." }, | |
| output: articleSchema, | |
| prompt: () => { | |
| const r = ctx.output("research"); | |
| return `Write a ${ctx.input.tone ?? "technical"} article (~${ctx.input.targetLength ?? 1000} words).\n\nResearch:\n${r.summary}\n\nKey points:\n${r.keyPoints.join("\n")}\n\nOutline:\n${r.outline.join("\n")}`; | |
| }, | |
| }), | |
| task("edit", { | |
| agent: { model: "chatgpt-pro", instructions: "Precise editor. Tighten prose, fix grammar, keep author's voice." }, | |
| output: editedSchema, | |
| prompt: () => { const a = ctx.output("write"); return `Edit for clarity:\n\n# ${a.title}\n\n${a.content}`; }, | |
| }), | |
| ]), | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * RPI — Research, Plan, Implement, Review | |
| * | |
| * Context engineering workflow. build(ctx) re-renders after each step. | |
| * Human-in-the-loop at research, design, and outline stages. | |
| * Code review loop after implementation checks against project conventions. | |
| */ | |
| import { defineWorkflow, task, sequence, loop } from "../extensions/smithers/types"; | |
| import { z } from "zod"; | |
| const questionsSchema = z.object({ questions: z.array(z.object({ question: z.string(), why: z.string(), searchHints: z.array(z.string()) })), scope: z.string() }); | |
| const researchSchema = z.object({ answers: z.array(z.object({ question: z.string(), answer: z.string(), relevantFiles: z.array(z.string()) })), architectureSummary: z.string(), patterns: z.array(z.string()), concerns: z.array(z.string()) }); | |
| const designSchema = z.object({ options: z.array(z.object({ name: z.string(), description: z.string(), pros: z.array(z.string()), cons: z.array(z.string()), recommendation: z.boolean() })), questionsForHuman: z.array(z.object({ question: z.string(), context: z.string() })), architecturalDecisions: z.array(z.string()), risks: z.array(z.string()) }); | |
| const outlineSchema = z.object({ phases: z.array(z.object({ name: z.string(), goal: z.string(), verification: z.string(), dependencies: z.array(z.string()) })), estimatedComplexity: z.enum(["small", "medium", "large", "xl"]) }); | |
| const planSchema = z.object({ plan: z.string(), totalSteps: z.number(), phases: z.array(z.object({ name: z.string(), steps: z.array(z.string()) })) }); | |
| const implSchema = z.object({ filesChanged: z.array(z.string()), testsRun: z.string(), summary: z.string() }); | |
| const reviewSchema = z.object({ approved: z.boolean(), issues: z.array(z.object({ file: z.string(), category: z.string(), severity: z.enum(["must-fix", "should-fix", "nit"]), description: z.string(), suggestion: z.string().optional() })), summary: z.string(), mustFixCount: z.number() }); | |
| const fixSchema = z.object({ filesChanged: z.array(z.string()), summary: z.string(), issuesFixed: z.number() }); | |
| const ok = z.object({ reviewed: z.boolean() }); | |
| export default defineWorkflow({ | |
| name: "rpi", | |
| description: "Research → Plan → Implement → Review", | |
| input: z.object({ ticket: z.string(), directory: z.string().optional() }), | |
| build: (ctx) => { | |
| const dir = ctx.input.directory ?? "."; | |
| const prevReview = ctx.outputMaybe("code-review"); | |
| return sequence([ | |
| // 1. Research questions (Gemini — fast, analytical) | |
| task("questions", { | |
| agent: { model: "gemini", instructions: "Convert feature ticket into objective research questions about the EXISTING codebase. No implementation questions." }, | |
| output: questionsSchema, | |
| prompt: () => `Analyze ticket and generate 5-15 research questions:\n\n${ctx.input.ticket}`, | |
| }), | |
| // 2. Research (Claude — has tools, reads code). TICKET HIDDEN = context isolation | |
| task("research", { | |
| agent: { model: "claude", instructions: "Research codebase. Answer by READING ACTUAL CODE. Don't guess." }, | |
| output: researchSchema, | |
| prompt: () => { | |
| const q = ctx.output("questions"); | |
| return `Research codebase at: ${dir}\n\n${q.questions.map((qq: any, i: number) => `${i + 1}. ${qq.question} (look in: ${qq.searchHints.join(", ")})`).join("\n")}`; | |
| }, | |
| }), | |
| // 3. ⏸ Human reviews research | |
| task("research-review", { needsApproval: true, label: "Review Research", output: ok, value: () => ({ reviewed: true }) }), | |
| // 4. Design discussion (Claude — research + ticket together for first time) | |
| task("design", { | |
| agent: { model: "claude", instructions: "Architect. Propose 2-3 options with pros/cons. Ask questions where human input needed." }, | |
| output: designSchema, | |
| prompt: () => { | |
| const r = ctx.output("research"); | |
| return `TICKET:\n${ctx.input.ticket}\n\nRESEARCH:\n${r.architectureSummary}\nPatterns: ${r.patterns.join(", ")}\nConcerns: ${r.concerns.join(", ")}`; | |
| }, | |
| }), | |
| // 5. ⏸ Human answers design questions | |
| task("design-review", { needsApproval: true, label: "Review Design — answer questions, pick options", output: ok, value: () => ({ reviewed: true }) }), | |
| // 6. Outline (Gemini — vertical phases) | |
| task("outline", { | |
| agent: { model: "gemini", instructions: "Create VERTICAL implementation phases (not horizontal layers). Each phase must have a concrete verification step." }, | |
| output: outlineSchema, | |
| prompt: () => { | |
| const d = ctx.output("design"); | |
| return `TICKET: ${ctx.input.ticket}\nDECISIONS: ${d.architecturalDecisions.join("; ")}\nRISKS: ${d.risks.join("; ")}`; | |
| }, | |
| }), | |
| // 7. ⏸ Human reviews outline | |
| task("outline-review", { needsApproval: true, label: "Review Outline", output: ok, value: () => ({ reviewed: true }) }), | |
| // 8. Detailed plan (Claude) | |
| task("plan", { | |
| agent: { model: "claude", instructions: "Expand outline into granular plan. Per-file, per-function. Reference actual paths from research." }, | |
| output: planSchema, | |
| prompt: () => { | |
| const [r, d, o] = [ctx.output("research"), ctx.output("design"), ctx.output("outline")]; | |
| return `TICKET: ${ctx.input.ticket}\nARCH: ${r.architectureSummary}\nDECISIONS: ${d.architecturalDecisions.join("; ")}\nPHASES:\n${o.phases.map((p: any) => `${p.name}: ${p.goal} (verify: ${p.verification})`).join("\n")}`; | |
| }, | |
| }), | |
| // 9. Implement (Codex — agentic, has tools) | |
| task("implement", { | |
| agent: { model: "codex", instructions: "Implement the plan step by step. Run verification after each phase. Fix failures before continuing." }, | |
| output: implSchema, | |
| retries: 1, | |
| prompt: () => `Implement in ${dir}:\n\n${ctx.output("plan").plan}`, | |
| }), | |
| // 10-11. Code review loop (Claude reviews → Codex fixes) | |
| loop( | |
| { until: () => ctx.outputMaybe("code-review")?.approved === true, maxIterations: 2, onMaxReached: "return-last" }, | |
| sequence([ | |
| task("code-review", { | |
| agent: { model: "claude", instructions: [ | |
| "Review changed files against project conventions:", | |
| "1. Read docs/effect-cookbook.md — Effect.gen, Layer, Match, Schema-first", | |
| "2. AI slop: unnecessary comments, defensive try/catch, any casts", | |
| "3. Clean code: SRP, DRY, naming, YAGNI", | |
| "4. Reference repos: check references/ for correct library usage", | |
| "5. Architecture: correct dependency direction, contracts package rules", | |
| "Set approved=true only if zero must-fix issues.", | |
| ].join("\n") }, | |
| output: reviewSchema, | |
| prompt: () => { | |
| const impl = ctx.output("implement"); | |
| const prev = ctx.outputMaybe("review-fix"); | |
| return `Review files: ${impl.filesChanged.join(", ")}${prev ? `\nPrevious fixes: ${prev.summary}` : ""}\n\nRead each file + docs/effect-cookbook.md. Check diff: git diff main`; | |
| }, | |
| }), | |
| // Codex fixes (skipped if approved) | |
| ...(prevReview?.approved ? [] : [ | |
| task("review-fix", { | |
| agent: { model: "codex", instructions: "Fix must-fix and should-fix issues. Run tests after." }, | |
| output: fixSchema, | |
| skipIf: () => ctx.outputMaybe("code-review")?.approved === true, | |
| prompt: () => { | |
| const r = ctx.output("code-review"); | |
| const issues = r.issues.filter((i: any) => i.severity !== "nit"); | |
| return `Fix:\n${issues.map((i: any) => `${i.file} [${i.category}] ${i.description}${i.suggestion ? ` → ${i.suggestion}` : ""}`).join("\n")}`; | |
| }, | |
| }), | |
| ]), | |
| ]), | |
| ), | |
| ]); | |
| }, | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Workflow execution engine. | |
| * | |
| * Core loop mirrors Smithers: | |
| * 1. Call build(ctx) → get node tree | |
| * 2. Schedule: walk tree, find runnable tasks | |
| * 3. Execute runnable tasks | |
| * 4. Persist each result immediately (commit) | |
| * 5. Re-render: goto 1 | |
| * | |
| * This re-render loop is what makes branching work without a BranchNode — | |
| * the tree changes shape based on completed outputs, just like Smithers' JSX. | |
| */ | |
| import type { WorkflowNode, WorkflowContext, WorkflowDefinition, TaskResult, RunState } from "./types"; | |
| import { executeTask } from "./agents"; | |
| import { randomUUID } from "node:crypto"; | |
| // ── Public API ── | |
| export type RunOptions = { | |
| workflow: WorkflowDefinition; | |
| input: any; | |
| cwd: string; | |
| signal?: AbortSignal; | |
| resumeState?: RunState; | |
| onProgress?: (event: ProgressEvent) => void; | |
| onApproval?: (taskId: string, label?: string) => Promise<boolean>; | |
| /** Called after every task — persist state here for crash recovery. */ | |
| onCommit?: (state: RunState) => void; | |
| }; | |
| export type ProgressEvent = | |
| | { type: "task_start"; taskId: string; iteration: number; agent?: string } | |
| | { type: "task_end"; taskId: string; iteration: number; result: TaskResult } | |
| | { type: "task_skip"; taskId: string; reason: string } | |
| | { type: "loop_iter"; loopId: string; iteration: number; max: number } | |
| | { type: "loop_done"; loopId: string; reason: "condition_met" | "max_reached" } | |
| | { type: "done"; status: "completed" | "failed"; error?: string } | |
| | { type: "info"; message: string }; | |
| export type RunResult = { state: RunState; outputs: Record<string, any> }; | |
| export async function runWorkflow(opts: RunOptions): Promise<RunResult> { | |
| const { workflow, input, cwd, signal, onProgress, onApproval, onCommit } = opts; | |
| if (workflow.input) workflow.input.parse(input); | |
| // State keyed by "taskId::iteration" like Smithers | |
| const state: RunState = opts.resumeState ?? { | |
| runId: randomUUID().slice(0, 8), | |
| workflowName: workflow.name, | |
| input, | |
| status: "running", | |
| results: [], | |
| loopIterations: {}, | |
| startedAt: Date.now(), | |
| }; | |
| state.status = "running"; | |
| const stateKey = (id: string, iter: number) => `${id}::${iter}`; | |
| const completed = new Map<string, TaskResult>(); | |
| for (const r of state.results) completed.set(stateKey(r.taskId, r.iteration), r); | |
| function commit(result: TaskResult) { | |
| completed.set(stateKey(result.taskId, result.iteration), result); | |
| state.results = [...completed.values()]; | |
| onCommit?.(state); | |
| } | |
| function buildContext(): WorkflowContext { | |
| return { | |
| runId: state.runId, | |
| input: state.input, | |
| output: (taskId) => { | |
| const r = state.results.findLast((r) => r.taskId === taskId && r.status === "success"); | |
| if (!r) throw new Error(`No output for "${taskId}"`); | |
| return r.output; | |
| }, | |
| outputMaybe: (taskId) => | |
| state.results.findLast((r) => r.taskId === taskId && r.status === "success")?.output, | |
| allOutputs: (taskId) => | |
| state.results.filter((r) => r.taskId === taskId && r.status === "success").map((r) => r.output), | |
| iterations: (taskId) => | |
| state.results.filter((r) => r.taskId === taskId && r.status === "success").length, | |
| }; | |
| } | |
| // ── Execute a single task ── | |
| async function runTask(node: import("./types").TaskNode, loopIter: number): Promise<void> { | |
| const iter = state.results.filter((r) => r.taskId === node.id).length; | |
| const key = stateKey(node.id, iter); | |
| // Already done (resume) | |
| const existing = completed.get(key); | |
| if (existing && (existing.status === "success" || existing.status === "skipped")) { | |
| onProgress?.({ type: "info", message: `Skipping "${node.id}" (resumed)` }); | |
| return; | |
| } | |
| const ctx = buildContext(); | |
| if (node.skipIf?.(ctx)) { | |
| commit({ taskId: node.id, iteration: iter, output: null, status: "skipped", durationMs: 0, timestamp: Date.now() }); | |
| onProgress?.({ type: "task_skip", taskId: node.id, reason: "skipIf" }); | |
| return; | |
| } | |
| if (node.needsApproval && onApproval) { | |
| const ok = await onApproval(node.id, node.label); | |
| if (!ok) { | |
| commit({ taskId: node.id, iteration: iter, output: null, status: "skipped", error: "denied", durationMs: 0, timestamp: Date.now() }); | |
| onProgress?.({ type: "task_skip", taskId: node.id, reason: "denied" }); | |
| return; | |
| } | |
| } | |
| onProgress?.({ type: "task_start", taskId: node.id, iteration: iter, agent: node.agent?.model }); | |
| const t0 = Date.now(); | |
| try { | |
| const output = await executeTask(node, ctx, cwd, signal, (m) => onProgress?.({ type: "info", message: m })); | |
| const result: TaskResult = { taskId: node.id, iteration: iter, output, status: "success", agent: node.agent?.model, durationMs: Date.now() - t0, timestamp: Date.now() }; | |
| commit(result); | |
| onProgress?.({ type: "task_end", taskId: node.id, iteration: iter, result }); | |
| } catch (err: any) { | |
| const result: TaskResult = { taskId: node.id, iteration: iter, output: null, status: "failed", error: err.message, agent: node.agent?.model, durationMs: Date.now() - t0, timestamp: Date.now() }; | |
| commit(result); | |
| onProgress?.({ type: "task_end", taskId: node.id, iteration: iter, result }); | |
| if (!node.continueOnFail) throw err; | |
| } | |
| } | |
| // ── Execute a node (recursive) ── | |
| async function executeNode(node: WorkflowNode, loopIter = 0): Promise<void> { | |
| if (signal?.aborted) throw new Error("Aborted"); | |
| switch (node.type) { | |
| case "task": | |
| return runTask(node, loopIter); | |
| case "sequence": | |
| for (const child of node.children) await executeNode(child, loopIter); | |
| return; | |
| case "parallel": { | |
| // Correct concurrency pool (fixes the Promise.race bug) | |
| const limit = node.maxConcurrency ?? node.children.length; | |
| const pool = new Set<Promise<void>>(); | |
| for (const child of node.children) { | |
| const p = executeNode(child, loopIter).finally(() => pool.delete(p)); | |
| pool.add(p); | |
| if (pool.size >= limit) await Promise.race(pool); | |
| } | |
| await Promise.all(pool); | |
| return; | |
| } | |
| case "loop": { | |
| const loopId = node.child.type === "sequence" | |
| ? node.child.children.filter((c) => c.type === "task").map((c) => (c as any).id).join("+") || "loop" | |
| : "loop"; | |
| for (let i = 0; i < node.maxIterations; i++) { | |
| if (signal?.aborted) throw new Error("Aborted"); | |
| // Check condition before executing (except first iteration) | |
| if (i > 0 && node.until(buildContext())) { | |
| onProgress?.({ type: "loop_done", loopId, reason: "condition_met" }); | |
| return; | |
| } | |
| onProgress?.({ type: "loop_iter", loopId, iteration: i, max: node.maxIterations }); | |
| state.loopIterations[loopId] = i; | |
| // Re-render the child tree each iteration (Smithers pattern) | |
| await executeNode(node.child, i); | |
| } | |
| if (node.until(buildContext())) { | |
| onProgress?.({ type: "loop_done", loopId, reason: "condition_met" }); | |
| return; | |
| } | |
| if (node.onMaxReached === "fail") { | |
| throw new Error(`Loop "${loopId}" hit max iterations (${node.maxIterations})`); | |
| } | |
| onProgress?.({ type: "loop_done", loopId, reason: "max_reached" }); | |
| return; | |
| } | |
| } | |
| } | |
| // ── Main loop: build → execute → re-render ── | |
| try { | |
| const tree = workflow.build(buildContext()); | |
| await executeNode(tree); | |
| state.status = "completed"; | |
| state.completedAt = Date.now(); | |
| onCommit?.(state); | |
| onProgress?.({ type: "done", status: "completed" }); | |
| } catch (err: any) { | |
| state.status = "failed"; | |
| state.error = err.message; | |
| state.completedAt = Date.now(); | |
| onCommit?.(state); | |
| onProgress?.({ type: "done", status: "failed", error: err.message }); | |
| } | |
| const outputs: Record<string, any> = {}; | |
| for (const r of state.results) if (r.status === "success") outputs[r.taskId] = r.output; | |
| return { state, outputs }; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Workflow types and DSL helpers. | |
| * Mirrors Smithers' component model: Task, Sequence, Parallel, Loop. | |
| * Branch is gone — use JS if/else inside build(ctx) instead. | |
| */ | |
| import type { z } from "zod"; | |
| // ── Agent ── | |
| export type AgentConfig = { | |
| /** | |
| * "claude" | "gemini" | "codex" | "chatgpt-pro" (and aliases). | |
| * All use CLI auth — no API keys needed. | |
| */ | |
| model: string; | |
| instructions?: string; | |
| }; | |
| // ── Context (passed to build() and prompt()) ── | |
| export type WorkflowContext<TInput = any> = { | |
| runId: string; | |
| input: TInput; | |
| /** Latest successful output for task. Throws if not found. */ | |
| output: <T = any>(taskId: string) => T; | |
| /** Latest successful output for task, or undefined. */ | |
| outputMaybe: <T = any>(taskId: string) => T | undefined; | |
| /** All successful outputs for task across loop iterations. */ | |
| allOutputs: <T = any>(taskId: string) => T[]; | |
| /** Number of successful completions for a task. */ | |
| iterations: (taskId: string) => number; | |
| }; | |
| // ── Nodes ── | |
| export type TaskNode = { | |
| type: "task"; | |
| id: string; | |
| agent?: AgentConfig; | |
| output: z.ZodType; | |
| prompt?: (ctx: WorkflowContext) => string; | |
| /** Static value — no LLM call. */ | |
| value?: (ctx: WorkflowContext) => any; | |
| skipIf?: (ctx: WorkflowContext) => boolean; | |
| retries?: number; | |
| fallbackAgents?: AgentConfig[]; | |
| continueOnFail?: boolean; | |
| needsApproval?: boolean; | |
| label?: string; | |
| }; | |
| export type SequenceNode = { type: "sequence"; children: WorkflowNode[] }; | |
| export type ParallelNode = { type: "parallel"; maxConcurrency?: number; children: WorkflowNode[] }; | |
| export type LoopNode = { | |
| type: "loop"; | |
| until: (ctx: WorkflowContext) => boolean; | |
| maxIterations: number; | |
| onMaxReached?: "fail" | "return-last"; | |
| child: WorkflowNode; | |
| }; | |
| export type WorkflowNode = TaskNode | SequenceNode | ParallelNode | LoopNode; | |
| // ── Workflow Definition ── | |
| export type WorkflowDefinition<TInput = any> = { | |
| name: string; | |
| description?: string; | |
| input?: z.ZodType<TInput>; | |
| /** | |
| * Build the execution graph. Called after every task completion (like | |
| * Smithers' JSX re-render). Use normal JS if/else for branching. | |
| */ | |
| build: (ctx: WorkflowContext<TInput>) => WorkflowNode; | |
| }; | |
| // ── Run State (persisted per step) ── | |
| export type TaskResult = { | |
| taskId: string; | |
| iteration: number; | |
| output: any; | |
| status: "success" | "failed" | "skipped"; | |
| error?: string; | |
| agent?: string; | |
| durationMs: number; | |
| timestamp: number; | |
| }; | |
| export type RunState = { | |
| runId: string; | |
| workflowName: string; | |
| input: any; | |
| status: "running" | "completed" | "failed" | "paused"; | |
| results: TaskResult[]; | |
| loopIterations: Record<string, number>; | |
| startedAt: number; | |
| completedAt?: number; | |
| error?: string; | |
| }; | |
| // ── DSL Helpers ── | |
| export const task = (id: string, config: Omit<TaskNode, "type" | "id">): TaskNode => | |
| ({ type: "task", id, ...config }); | |
| export const sequence = (children: WorkflowNode[]): SequenceNode => | |
| ({ type: "sequence", children: children.filter(Boolean) as WorkflowNode[] }); | |
| export const parallel = (children: WorkflowNode[], opts?: { maxConcurrency?: number }): ParallelNode => | |
| ({ type: "parallel", children: children.filter(Boolean) as WorkflowNode[], ...opts }); | |
| export const loop = ( | |
| config: { until: (ctx: WorkflowContext) => boolean; maxIterations: number; onMaxReached?: "fail" | "return-last" }, | |
| child: WorkflowNode, | |
| ): LoopNode => | |
| ({ type: "loop", ...config, child }); | |
| export const defineWorkflow = <TInput = any>(config: WorkflowDefinition<TInput>): WorkflowDefinition<TInput> => config; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * TUI overlay for workflow progress. Minimal — just what's needed. | |
| */ | |
| import type { Theme } from "@mariozechner/pi-coding-agent"; | |
| import type { Component } from "@mariozechner/pi-tui"; | |
| import { truncateToWidth, visibleWidth, wrapTextWithAnsi } from "@mariozechner/pi-tui"; | |
| import type { ProgressEvent } from "./runtime"; | |
| const fmtDur = (ms: number) => { | |
| const s = Math.floor(ms / 1000); | |
| return s < 60 ? `${s}s` : `${Math.floor(s / 60)}m${(s % 60).toString().padStart(2, "0")}s`; | |
| }; | |
| export type OverlayState = { | |
| name: string; | |
| runId: string; | |
| status: "running" | "completed" | "failed"; | |
| startTime: number; | |
| current?: { task: string; agent?: string; iter?: number }; | |
| done: Array<{ task: string; status: string; agent?: string; ms: number }>; | |
| logs: string[]; | |
| error?: string; | |
| }; | |
| export const createOverlay = (name: string, runId: string): OverlayState => ({ | |
| name, runId, status: "running", startTime: Date.now(), done: [], logs: [], | |
| }); | |
| export function applyEvent(s: OverlayState, e: ProgressEvent) { | |
| switch (e.type) { | |
| case "task_start": s.current = { task: e.taskId, agent: e.agent, iter: e.iteration }; break; | |
| case "task_end": | |
| s.current = undefined; | |
| s.done.push({ task: e.taskId, status: e.result.status, agent: e.result.agent, ms: e.result.durationMs }); | |
| break; | |
| case "task_skip": s.done.push({ task: e.taskId, status: "skipped", ms: 0 }); break; | |
| case "loop_iter": s.logs.push(`Loop ${e.iteration + 1}/${e.max}`); break; | |
| case "loop_done": s.logs.push(`Loop: ${e.reason}`); break; | |
| case "done": s.status = e.status; if (e.error) s.error = e.error; break; | |
| case "info": s.logs.push(e.message); break; | |
| } | |
| } | |
| export class WorkflowPanel implements Component { | |
| constructor(private s: OverlayState, private th: Theme) {} | |
| render(width: number): string[] { | |
| const { s, th } = this; | |
| const w = width - 4; | |
| if (w < 10) return []; | |
| const pad = (c: string) => { | |
| const p = Math.max(0, w - visibleWidth(c)); | |
| return th.fg("border", "│") + " " + c + " ".repeat(p) + " " + th.fg("border", "│"); | |
| }; | |
| const lines: string[] = []; | |
| const elapsed = fmtDur(Date.now() - s.startTime); | |
| const title = ` ⚙ ${s.name} ${elapsed} `; | |
| const rem = Math.max(0, w - title.length); | |
| lines.push(th.fg("border", "╭" + "─".repeat(Math.floor(rem / 2))) + th.fg("accent", title) + th.fg("border", "─".repeat(rem - Math.floor(rem / 2)) + "╮")); | |
| const icon = s.status === "running" ? th.fg("warning", "●") : s.status === "completed" ? th.fg("success", "✓") : th.fg("error", "✗"); | |
| lines.push(pad(`${icon} ${th.fg("muted", `run: ${s.runId}`)}`)); | |
| if (s.current) { | |
| let cur = th.fg("warning", "→ ") + th.fg("accent", s.current.task); | |
| if (s.current.agent) cur += th.fg("dim", ` (${s.current.agent})`); | |
| lines.push(pad(cur)); | |
| } | |
| lines.push(th.fg("border", "├" + "─".repeat(w + 2) + "┤")); | |
| for (const t of s.done.slice(-8)) { | |
| const i = t.status === "success" ? th.fg("success", "✓") : t.status === "skipped" ? th.fg("dim", "○") : th.fg("error", "✗"); | |
| lines.push(pad(truncateToWidth(`${i} ${th.fg("text", t.task)}${t.agent ? th.fg("dim", ` (${t.agent})`) : ""} ${th.fg("dim", fmtDur(t.ms))}`, w))); | |
| } | |
| if (!s.done.length) lines.push(pad(th.fg("dim", "Waiting..."))); | |
| const logs = s.logs.slice(-3); | |
| if (logs.length) { | |
| lines.push(th.fg("border", "├" + "─".repeat(w + 2) + "┤")); | |
| for (const l of logs) for (const wl of wrapTextWithAnsi(th.fg("dim", l), w).slice(0, 2)) lines.push(pad(truncateToWidth(wl, w))); | |
| } | |
| if (s.error) { | |
| lines.push(th.fg("border", "├" + "─".repeat(w + 2) + "┤")); | |
| lines.push(pad(th.fg("error", `Error: ${s.error.slice(0, w - 10)}`))); | |
| } | |
| lines.push(th.fg("border", "╰" + "─".repeat(w + 2) + "╯")); | |
| return lines; | |
| } | |
| invalidate() {} | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment