A pragmatic recipe for turning raw data into structured knowledge, reasoning over it, and then deciding who (machine or human) pulls the trigger.
| Tier | Focus | Mnemonic |
|---|---|---|
| 1 | Structure the world | raw → records |
| 2 | Reason about it | records → recommendations |
| 3 | Act (Job Svc or Agent) | recs → reality |
By default, Tier 3 runs through a Job Service (human-in-the-loop). You can toggle actualization rights at runtime to let an agent execute directly via trusted tools (e.g., email API, SMS). See §7.
| Layer | Model | Typical use |
|---|---|---|
| Tier 1 | gpt-4.1-mini |
parsing / classification / extraction |
| Tier 2 | openai-o3-mini |
next-step generation & prioritisation |
Both are exposed as ctx.env.llm.
| Principle | TL;DR |
|---|---|
| Task modularity | 1 task = 1 transform. |
| Atomic tools | Small, stateless helpers (OCR, embed, classify). |
| Schema-typed | All IO through repos, never loose JSON / raw SQL. |
| Eval hooks | QA every LLM hop. |
| Contextual | Scoped PipelineContext (repos, tools, logs, config). |
| Reasoning = Pipeline | Agents are pipelines whose tasks think. |
export type TaskFn<In, Out> = (
input: In,
ctx: PipelineContext
) => Promise<Out>;export function createDocClassifier(openai: OpenAI) {
return async (text: string): Promise<string> => {
const r = await openai.chat.completions.create({
model: 'gpt-4.1-mini',
messages: [{ role: 'system', content: `Classify:\n\n${text}` }],
});
return parse(r.choices[0].message.content);
};
}interface PipelineContext {
env: {
repos: Record<string, any>;
tools: Record<string, BaseTool>;
logs: Logger;
config: Record<string, any>;
llm: OpenAI;
};
state: {
sourceId?: string;
sourceType?: SourceType;
};
// built-ins
reportStatus: (s: JobStatus, details?: string) => void;
reportProgress: (cur: number, tot: number) => void;
reportError: (err: Error) => void;
}export type EvalHook<Out, Ctx> = (
output: Out,
ctx: Ctx
) => Promise<void>;const createPipeline = <
Initial,
Final
>(
tasks: TaskFn<any, any>[],
evalHooks: (EvalHook<any, PipelineContext> | undefined)[]
): TaskFn<Initial, Final> => { /* … */ };Use undefined explicitly where no hook is attached.
const docIngestion = createPipeline<
{ rawDocument: Buffer },
{ docId: string }
>(
[
createSourceTask,
parseDocTask,
classifyDocTask,
storeDocTask,
],
[
undefined, // after createSourceTask
undefined, // after parseDocTask
classificationEvalHook,
undefined, // after storeDocTask
]
);const recPipeline = createPipeline<
{ graphId: string },
{ recIds: string[] }
>(
[
loadContextGraphTask,
runLLMReasoningTask,
parseRecommendationTask,
saveRecommendationTask,
],
[
undefined,
undefined,
validateRecStructureHook,
undefined,
]
);| Mode | Who Executes | How to Enable |
|---|---|---|
| Human-in-loop | Job Service queues recs; user approves in UI | default |
| Agent-exec | Pipeline auto-calls trusted tools (email API, SMS, etc.) | Pass { allowActualization: true } in pipeline config or job payload |
Runtime toggle example
if (ctx.env.config.allowActualization) {
await sendEmailTool(/* … */);
} else {
await ctx.env.repos.jobs.create({
type: 'send_email',
payload: { /* … */ },
executionStrategy: 'manual',
});
}You can progressively flip the switch per-pipeline, per-user, or per-tenant as confidence grows.
try {
/* risky op */
} catch (err) {
ctx.env.logs.error('vectorisation failed', { error: err });
throw err;
}Always include the real error object—observability matters.
- Create a source record first.
- Keep tasks atomic & typed.
- Chain eval hooks on every LLM output.
- Preserve provenance via
sourceId. - Route actions through Job Service unless
allowActualizationis true.
- Write god-tasks.
- Embed execution logic inside reasoning steps.
- Skip eval hooks and “hope”.
- Leak raw SQL into a task.
- Lose source tracking downstream.
Pipeline = ETL for chaos → context. Recommendation Pipeline = ETL where “T” is “Think”. Actualization = choose whether the brain calls the hands itself, or asks a human first.