Skip to content

Instantly share code, notes, and snippets.

@danialhasan
Last active May 8, 2025 18:21
Show Gist options
  • Select an option

  • Save danialhasan/89f8e394dcef68e81b3d7466e2221af0 to your computer and use it in GitHub Desktop.

Select an option

Save danialhasan/89f8e394dcef68e81b3d7466e2221af0 to your computer and use it in GitHub Desktop.

AI Service v2 – Pipeline Framework

A pragmatic recipe for turning raw data into structured knowledge, reasoning over it, and then deciding who (machine or human) pulls the trigger.


1 Quick Map

Tier Focus Mnemonic
1 Structure the world raw → records
2 Reason about it records → recommendations
3 Act (Job Svc or Agent) recs → reality

By default, Tier 3 runs through a Job Service (human-in-the-loop). You can toggle actualization rights at runtime to let an agent execute directly via trusted tools (e.g., email API, SMS). See §7.


2 LLM Roster

Layer Model Typical use
Tier 1 gpt-4.1-mini parsing / classification / extraction
Tier 2 openai-o3-mini next-step generation & prioritisation

Both are exposed as ctx.env.llm.


3 Ground Rules

Principle TL;DR
Task modularity 1 task = 1 transform.
Atomic tools Small, stateless helpers (OCR, embed, classify).
Schema-typed All IO through repos, never loose JSON / raw SQL.
Eval hooks QA every LLM hop.
Contextual Scoped PipelineContext (repos, tools, logs, config).
Reasoning = Pipeline Agents are pipelines whose tasks think.

4 Building Blocks

4.1 Task signature

export type TaskFn<In, Out> = (
  input: In,
  ctx: PipelineContext
) => Promise<Out>;

4.2 Tool example

export function createDocClassifier(openai: OpenAI) {
  return async (text: string): Promise<string> => {
    const r = await openai.chat.completions.create({
      model: 'gpt-4.1-mini',
      messages: [{ role: 'system', content: `Classify:\n\n${text}` }],
    });
    return parse(r.choices[0].message.content);
  };
}

4.3 PipelineContext (abridged)

interface PipelineContext {
  env: {
    repos: Record<string, any>;
    tools: Record<string, BaseTool>;
    logs: Logger;
    config: Record<string, any>;
    llm: OpenAI;
  };
  state: {
    sourceId?: string;
    sourceType?: SourceType;
  };
  // built-ins
  reportStatus: (s: JobStatus, details?: string) => void;
  reportProgress: (cur: number, tot: number) => void;
  reportError: (err: Error) => void;
}

4.4 Eval Hook signature

export type EvalHook<Out, Ctx> = (
  output: Out,
  ctx: Ctx
) => Promise<void>;

5 Composing Pipelines

const createPipeline = <
  Initial,
  Final
>(
  tasks: TaskFn<any, any>[],
  evalHooks: (EvalHook<any, PipelineContext> | undefined)[]
): TaskFn<Initial, Final> => { /* … */ };

Use undefined explicitly where no hook is attached.


6 End-to-End Examples

6.1 Doc Ingestion ( Tier 1 )

const docIngestion = createPipeline<
  { rawDocument: Buffer },
  { docId: string }
>(
  [
    createSourceTask,
    parseDocTask,
    classifyDocTask,
    storeDocTask,
  ],
  [
    undefined,          // after createSourceTask
    undefined,          // after parseDocTask
    classificationEvalHook,
    undefined,          // after storeDocTask
  ]
);

6.2 Recommendation Pipeline ( Tier 2 )

const recPipeline = createPipeline<
  { graphId: string },
  { recIds: string[] }
>(
  [
    loadContextGraphTask,
    runLLMReasoningTask,
    parseRecommendationTask,
    saveRecommendationTask,
  ],
  [
    undefined,
    undefined,
    validateRecStructureHook,
    undefined,
  ]
);

7 Actualization Modes

Mode Who Executes How to Enable
Human-in-loop Job Service queues recs; user approves in UI default
Agent-exec Pipeline auto-calls trusted tools (email API, SMS, etc.) Pass { allowActualization: true } in pipeline config or job payload

Runtime toggle example

if (ctx.env.config.allowActualization) {
  await sendEmailTool(/* … */);
} else {
  await ctx.env.repos.jobs.create({
    type: 'send_email',
    payload: { /* … */ },
    executionStrategy: 'manual',
  });
}

You can progressively flip the switch per-pipeline, per-user, or per-tenant as confidence grows.


8 Logging Pattern

try {
  /* risky op */
} catch (err) {
  ctx.env.logs.error('vectorisation failed', { error: err });
  throw err;
}

Always include the real error object—observability matters.


9 Do / Don’t Checklist

✅ Do

  • Create a source record first.
  • Keep tasks atomic & typed.
  • Chain eval hooks on every LLM output.
  • Preserve provenance via sourceId.
  • Route actions through Job Service unless allowActualization is true.

❌ Don’t

  • Write god-tasks.
  • Embed execution logic inside reasoning steps.
  • Skip eval hooks and “hope”.
  • Leak raw SQL into a task.
  • Lose source tracking downstream.

10 One-Sentence Mental Model

Pipeline = ETL for chaos → context. Recommendation Pipeline = ETL where “T” is “Think”. Actualization = choose whether the brain calls the hands itself, or asks a human first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment