Skip to content

Instantly share code, notes, and snippets.

@ConProgramming
Created March 27, 2026 18:05
Show Gist options
  • Select an option

  • Save ConProgramming/6f9b46847f9c0cecef668b61b6cb5ffe to your computer and use it in GitHub Desktop.

Select an option

Save ConProgramming/6f9b46847f9c0cecef668b61b6cb5ffe to your computer and use it in GitHub Desktop.
AI SDK + Trigger.dev Realtime interop — custom ChatTransport, SSE streaming, stream piping
import { streams } from "@trigger.dev/sdk";
import type { UIMessageChunk } from "ai";
export const aiStream = streams.define<UIMessageChunk>({ id: "ai" });
export const runProgressStream = streams.define<string>({ id: "run_progress" });
import { schemaTask, logger, tags } from "@trigger.dev/sdk";
import { streamText, convertToModelMessages, validateUIMessages, stepCountIs } from "ai";
import type { ToolSet, UIMessage, UIMessageChunk } from "ai";
import { aiStream } from "./streams";
import { createRunProgressTransport } from "./run-progress-transport";
// ...
export const assistantChat = schemaTask({
id: "assistant-chat",
retry: { maxAttempts: 1 },
schema: assistantChatTriggerPayloadSchema,
init: async () => {
// ... initialize DB clients, AI provider, etc.
return { SB, AI, PDB };
},
run: async (payload, { ctx, init }) => {
const { SB, AI, PDB } = init!;
// ... destructure payload, load/create conversation, store user message, load history
const runProgressTransport = createRunProgressTransport(transport, ctx.run.id, SB);
// ... build tools, system prompt, validate/convert messages
const result = streamText({
model: AI.languageModel("thinking"),
system: finalSystemPrompt,
messages: modelMessages,
tools: toolsWithCodeMode,
stopWhen: stepCountIs(stepLimit),
experimental_telemetry: { isEnabled: true, recordInputs: true, recordOutputs: true },
});
const stream = result.toUIMessageStream({
originalMessages: allMessages,
onFinish: async ({ responseMessage }) => {
// ... save response to DB
},
});
// For new conversations, prepend a data chunk; title is sent via run_progress (no AI stream delay).
const isNewConversation = currentConversationId == null || currentConversationId === "";
const titlePromise = isNewConversation
? (async () => {
// ... generate title and send via runProgressTransport
await runProgressTransport.send("chat-title", { chatId: String(questionId), title });
})()
: Promise.resolve();
const streamToPipe = isNewConversation
? (async function* (): AsyncGenerator<UIMessageChunk> {
yield {
type: "data-conversation-created",
data: { id: String(questionId) },
} as UIMessageChunk;
for await (const chunk of stream) {
yield chunk;
}
})()
: stream;
const { waitUntilComplete } = aiStream.pipe(streamToPipe);
try {
await logger.trace("ai-stream", async () => {
await waitUntilComplete();
});
} finally {
await runProgressTransport.unsubscribe();
}
await titlePromise;
// ...
},
});
import { getTransport } from "$lib/ai/realtime/transport-utils";
import { triggerAssistantTask, triggerChatResponseJson } from "$lib/ai/trigger-chat-bridge";
// ...
export const POST: RequestHandler = async (event) => {
// ... auth, rate limiting, request validation
const transport = getTransport(org.meta);
const payload = {
userId: user.id,
// ... app-specific fields
message,
messages,
transport,
};
const result = await triggerAssistantTask({
taskId: "assistant-chat",
payload,
transport,
});
return triggerChatResponseJson(result);
};
<script lang="ts">
// ...
import { Chat } from "@ai-sdk/svelte";
import {
DefaultChatTransport,
lastAssistantMessageIsCompleteWithToolCalls,
type ChatTransport,
type UIMessage,
} from "ai";
// ...
import { TriggerDevChatTransport } from "$lib/ai/realtime/trigger-chat-transport";
import { createTriggerRealtimeTransport } from "$lib/ai/realtime/trigger-transport";
// ...
// Read transport mode from org feature flag
const assistantTransport = $derived(
(flags?.legacy?.meta as { assistant_transport?: string })
?.assistant_transport ?? "legacy",
);
const useTriggerAgent = $derived(assistantTransport === "trigger");
const chatV2ApiUrl = $derived(
`/projects-v3/${page.params.id}/api/assistant/chat-v2`,
);
// Create the Trigger Realtime client stream transport
const triggerTransport = $derived(
useTriggerAgent ? createTriggerRealtimeTransport() : null,
);
// ...
// Wrap in TriggerDevChatTransport (posts to chat-v2, subscribes to Trigger Realtime)
const triggerTransportOrNull = $derived(
useTriggerAgent && triggerTransport
? new TriggerDevChatTransport({
apiUrl: chatV2ApiUrl,
clientTransport: triggerTransport,
body: getAskModeBody,
})
: null,
);
// ... additional TriggerDevChatTransport instances for outline/content modes
// with runProgressCallbacks for onOutlineGenerationProgress / onContentGenerationProgress
// Delegating transport: switches between Trigger and legacy at runtime
const getTransport = () => triggerTransportOrNull ?? defaultTransport;
const delegatingTransport: ChatTransport<UIMessage> = {
sendMessages: (opts) => getTransport().sendMessages(opts),
reconnectToStream: (opts) => getTransport().reconnectToStream(opts),
};
// ...
// Chat instance uses the delegating transport
instance = new Chat({
transport: delegatingTransport,
sendAutomaticallyWhen: (context) => {
return lastAssistantMessageIsCompleteWithToolCalls(context);
},
// ...
});
// ...
</script>
<!-- ... component template -->
/**
* Cancel an assistant run (Trigger.dev run).
*/
import { error, json } from "@sveltejs/kit";
import { requireRateLimit } from "$lib/rate-limit/middleware";
import { runs } from "@trigger.dev/sdk";
import type { RequestHandler } from "./$types";
export const POST: RequestHandler = async (event) => {
const {
request,
locals: { getUser },
} = event;
const user = await getUser();
if (!user) return error(401, "Unauthorized");
await requireRateLimit(event, "MUTATION", `mutation:user:${user.id}`);
const body = await request.json();
const runId = body?.runId;
if (typeof runId !== "string" || runId.length === 0) return error(400, "Missing runId");
let run;
try {
// Verify the authenticated user owns this run
run = await runs.retrieve(runId);
} catch {
// Run may be stale, already deleted, or invalid
return json({ ok: true });
}
const payload = run.payload as Record<string, unknown> | undefined;
if (payload?.userId !== user.id) return error(403, "Forbidden");
try {
await runs.cancel(runId);
} catch {
// Run may have completed or been cancelled before we could cancel it
}
return json({ ok: true });
};
/** Accumulates outline deltas into full-state payloads for the Svelte store. */
export class OutlineProgressAccumulator {
// ... internal state
/** Apply a delta payload, return the full-state payload to forward. */
apply(data: Record<string, unknown>) {
// ... accumulate deltas into full state, return merged payload
}
// ...
}
/** Accumulates content section deltas into full-state payloads for the Svelte store. */
export class ContentProgressAccumulator {
// ... internal state
apply(data: Record<string, unknown>) {
// ... accumulate section deltas into full state, return merged payload
}
// ...
}
/**
* Trigger.dev Realtime Stream definitions for AI agent
* Requires @trigger.dev/sdk 4.1.0+
*/
import { streams, type InferStreamType } from "@trigger.dev/sdk";
import type { UIMessageChunk } from "ai";
export const aiStream = streams.define<UIMessageChunk>({ id: "ai" });
/** Auxiliary stream for run progress (outline, content, export, etc.). Chunk: JSON string of { event, data } for readable dashboard. */
export const runProgressStream = streams.define<string>({ id: "run_progress" });
export type AIStreamPart = InferStreamType<typeof aiStream>;
export type RunProgressPart = InferStreamType<typeof runProgressStream>;
import { getTransport } from "$lib/ai/realtime/transport-utils";
import { triggerAssistantTask, triggerChatResponseJson } from "$lib/ai/trigger-chat-bridge";
// ...
export const POST: RequestHandler = async (event) => {
// ... auth, rate limiting, request validation
const projectId = parseInt(params.id);
// ... validate project ID
const transport = getTransport(org.meta);
const payload = {
userId: user.id,
projectId,
// ... app-specific fields (documentContext, capabilities, etc.)
message,
messages,
transport,
};
const result = await triggerAssistantTask({
taskId: "project-assistant-chat",
payload,
transport,
});
// ... optional activity logging
return triggerChatResponseJson(result);
};
/**
* Run progress transport: stream auxiliary events (outline, content, export, etc.) to the client
* via the Trigger Realtime run_progress stream.
*/
import { runProgressStream } from "../streams";
export interface RunProgressTransport {
send(event: string, data: unknown): Promise<void>;
unsubscribe(): Promise<void>;
}
/** Append to Trigger run_progress stream. */
class TriggerStreamRunProgressTransport implements RunProgressTransport {
async send(event: string, data: unknown): Promise<void> {
await runProgressStream.append(JSON.stringify({ event, data }));
}
async unsubscribe(): Promise<void> {}
}
export function createRunProgressTransport(): RunProgressTransport {
return new TriggerStreamRunProgressTransport();
}
/** Valid transport values for assistant chat endpoints. */
export type TransportValue = "trigger";
/** Read the transport mode from org metadata, defaulting to "trigger". */
export function getTransport(meta: Record<string, unknown> | null | undefined): TransportValue {
return "trigger";
}
/**
* Shared Trigger.dev bridge for assistant chat-v2 routes (validate body → trigger → JSON response).
*/
import { tasks } from "@trigger.dev/sdk";
import { json } from "@sveltejs/kit";
import { TRIGGER_DEV_API_URL } from "$lib/env/private.server";
import type { TransportValue } from "$lib/ai/realtime/transport-utils";
export type TriggerChatBridgeResult = {
runId: string;
publicAccessToken: string;
transport: TransportValue;
triggerApiUrl: string;
};
export const triggerChatResponseJson = (result: TriggerChatBridgeResult) =>
json({
runId: result.runId,
publicAccessToken: result.publicAccessToken,
transport: result.transport,
triggerApiUrl: TRIGGER_DEV_API_URL,
});
export const triggerAssistantTask = async ({
taskId,
payload,
transport,
}: {
taskId: "assistant-chat" | "project-assistant-chat";
payload: Record<string, unknown>;
transport: TransportValue;
}): Promise<TriggerChatBridgeResult> => {
const handle = await tasks.trigger(taskId, payload);
return {
runId: handle.id,
publicAccessToken: handle.publicAccessToken,
transport,
triggerApiUrl: TRIGGER_DEV_API_URL,
};
};
/**
* ChatTransport implementation for Trigger.dev: POST to chat-v2, then consume
* the run's AI stream via ClientStreamTransport.
* Returns a ReadableStream<UIMessageChunk> so the Chat class handles message building.
* Browser tools are executed by the Chat class via onToolCall (stateless chat pattern).
*/
import type { ChatTransport } from "ai";
import type { UIMessage, UIMessageChunk } from "ai";
import type { ClientStreamTransport } from "./types";
import { OutlineProgressAccumulator, ContentProgressAccumulator } from "./progress-accumulators";
const TAG = "[TriggerTransport]";
interface RunState {
runId: string;
publicAccessToken: string;
triggerApiUrl: string;
lastEventId?: string;
}
/** Optional callbacks for run progress (outline, content generation, chat title). */
export interface TriggerRunProgressCallbacks {
onOutlineGenerationProgress?: (data: unknown) => void;
onContentGenerationProgress?: (data: unknown) => void;
onChatTitle?: (data: { chatId: string; title: string }) => void;
}
/** A value or a (possibly async) function that returns a value — mirrors the SDK's Resolvable pattern. */
type Resolvable<T> = T | (() => T | Promise<T>);
async function resolve<T>(value: Resolvable<T> | undefined): Promise<T | undefined> {
if (typeof value === "function") {
return await (value as () => T | Promise<T>)();
}
return value;
}
export interface TriggerDevChatTransportOptions {
/** Chat-v2 endpoint URL (e.g. /assistant-v3/api/chat-v2). */
apiUrl: string;
/** Client transport to subscribe to the run (Trigger Realtime or Supabase Broadcast). */
clientTransport: ClientStreamTransport;
/** Optional: run progress callbacks for outline/content generation. */
runProgressCallbacks?: TriggerRunProgressCallbacks;
/**
* Extra body properties merged into every POST to chat-v2.
* Can be a static object or a (possibly async) function evaluated at call time.
* Matches the `body` option on DefaultChatTransport / HttpChatTransport.
*/
body?: Resolvable<object>;
}
export class TriggerDevChatTransport<
UI_MESSAGE extends UIMessage = UIMessage,
> implements ChatTransport<UI_MESSAGE> {
private readonly apiUrl: string;
private readonly clientTransport: ClientStreamTransport;
private readonly runProgressCallbacks: TriggerRunProgressCallbacks | undefined;
private readonly body: Resolvable<object> | undefined;
private readonly runStateByChatId = new Map<string, RunState>();
constructor(options: TriggerDevChatTransportOptions) {
this.apiUrl = options.apiUrl;
this.clientTransport = options.clientTransport;
this.runProgressCallbacks = options.runProgressCallbacks;
this.body = options.body;
}
async sendMessages(
options: Parameters<ChatTransport<UI_MESSAGE>["sendMessages"]>[0],
): Promise<ReadableStream<UIMessageChunk>> {
const { messages, abortSignal, body: perRequestBody, chatId } = options;
const lastUserMessage = [...messages].filter((m) => m.role === "user").at(-1);
if (!lastUserMessage) {
throw new Error("No user message to send");
}
// Continuation: last message is assistant with tool results — send all messages so chat-v2 can save them
const lastMsg = messages[messages.length - 1];
const isContinuation =
lastMsg?.role === "assistant" &&
lastMsg.parts?.some(
(p: { type?: string; output?: unknown }) =>
p.type?.startsWith?.("tool-") && (p as { output?: unknown }).output !== undefined,
);
const resolvedBody = (await resolve(this.body)) ?? {};
const requestBody = {
...resolvedBody,
...perRequestBody,
...(isContinuation ? { messages } : { message: lastUserMessage }),
};
console.log(TAG, `POST ${this.apiUrl}`, { bodyKeys: Object.keys(requestBody), isContinuation });
const res = await fetch(this.apiUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(requestBody),
signal: abortSignal ?? undefined,
});
if (!res.ok) {
const errorText = await res.text().catch(() => "");
let detail = errorText;
try {
const json = JSON.parse(errorText) as { message?: string; issues?: unknown[] };
if (json.issues?.length) {
console.error(TAG, `POST ${this.apiUrl} failed: ${res.status}`, json);
detail = json.message + " " + JSON.stringify(json.issues);
} else {
console.error(TAG, `POST ${this.apiUrl} failed: ${res.status}`, errorText);
}
} catch {
console.error(TAG, `POST ${this.apiUrl} failed: ${res.status}`, errorText);
}
throw new Error(`Chat request failed: ${res.status} - ${detail}`);
}
const { runId, publicAccessToken, transport, triggerApiUrl } = (await res.json()) as {
runId: string;
publicAccessToken: string;
transport?: string;
triggerApiUrl?: string;
};
console.log(TAG, `Run started: ${runId}`, { transport });
if (!triggerApiUrl) {
throw new Error("Trigger API URL is required");
}
const runState: RunState = { runId, publicAccessToken, triggerApiUrl };
if (chatId != null) {
this.runStateByChatId.set(chatId, runState);
}
const { readable, writable } = new TransformStream<UIMessageChunk, UIMessageChunk>();
const writer = writable.getWriter();
let unsubscribe: (() => void) | null = null;
let completed = false;
const cleanup = () => {
unsubscribe?.();
unsubscribe = null;
};
/** Close the writer and abort the SSE subscription. Safe to call multiple times. */
const finishStream = () => {
if (completed) return;
completed = true;
if (chatId != null) this.runStateByChatId.delete(chatId);
console.log(TAG, `[TIMING] Stream finished ts=${Date.now()} (run ${runId})`);
cleanup();
writer.close().catch(() => {});
};
if (abortSignal?.aborted) {
writer.abort(abortSignal.reason);
return readable;
}
abortSignal?.addEventListener(
"abort",
() => {
if (completed) return;
console.log(TAG, `Aborted (run ${runId})`);
cleanup();
writer.abort(abortSignal.reason).catch(() => {});
fetch("/api/assistant/cancel", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ runId }),
}).catch(() => {});
},
{ once: true },
);
const outlineAcc = new OutlineProgressAccumulator();
const contentAcc = new ContentProgressAccumulator();
const cbs = this.runProgressCallbacks;
unsubscribe = this.clientTransport.subscribe(
runId,
publicAccessToken,
{
onAIChunk: (chunk) => {
if (abortSignal?.aborted || completed) return;
writer.write(chunk).catch(() => {});
// Detect the AI SDK's "finish" chunk — the last meaningful chunk in
// the UIMessageStream. Close immediately instead of waiting for
// Trigger's 60s SSE idle timeout to close the HTTP connection.
// Worth noting this timeout exists to handle reconnects
if ((chunk as { type?: string }).type === "finish") {
finishStream();
}
},
onRunProgress: (event: string, data: unknown) => {
if (event === "chat-title") {
cbs?.onChatTitle?.(data as { chatId: string; title: string });
return;
}
const d = data as Record<string, unknown>;
if (event === "outline" && cbs?.onOutlineGenerationProgress) {
cbs.onOutlineGenerationProgress(outlineAcc.apply(d));
} else if (event === "content" && cbs?.onContentGenerationProgress) {
cbs.onContentGenerationProgress(contentAcc.apply(d));
}
},
onComplete: () => {
// Fallback: SSE connection closed by Trigger (normally 60s idle timeout).
// Usually a no-op because finishStream() already ran on the "finish" chunk.
finishStream();
},
onError: (err) => {
const isAbort = err?.name === "AbortError" || err?.message?.includes("aborted");
if (isAbort || completed) {
// Expected: SSE fetch was aborted after we called finishStream().
cleanup();
return;
}
console.error(TAG, `Stream error (run ${runId}):`, err);
cleanup();
writer.abort(err).catch(() => {});
},
},
{
baseUrl: triggerApiUrl,
onLastEventId: (id) => {
runState.lastEventId = id;
},
},
);
return readable;
}
async reconnectToStream(
options: Parameters<ChatTransport<UI_MESSAGE>["reconnectToStream"]>[0],
): Promise<ReadableStream<UIMessageChunk> | null> {
const chatId = (options as { chatId?: string }).chatId;
if (chatId == null) return null;
const runState = this.runStateByChatId.get(chatId);
if (
!runState ||
runState.runId == null ||
runState.publicAccessToken == null ||
runState.triggerApiUrl == null
) {
return null;
}
const { readable, writable } = new TransformStream<UIMessageChunk, UIMessageChunk>();
const writer = writable.getWriter();
let unsubscribe: (() => void) | null = null;
let completed = false;
const finishStream = () => {
if (completed) return;
completed = true;
this.runStateByChatId.delete(chatId);
unsubscribe?.();
unsubscribe = null;
writer.close().catch(() => {});
};
unsubscribe = this.clientTransport.subscribe(
runState.runId,
runState.publicAccessToken,
{
onAIChunk: (chunk) => {
if (completed) return;
writer.write(chunk).catch(() => {});
if ((chunk as { type?: string }).type === "finish") {
finishStream();
}
},
onRunProgress: () => {},
onComplete: finishStream,
onError: (err) => {
if (completed) return;
unsubscribe?.();
unsubscribe = null;
writer.abort(err instanceof Error ? err : new Error(String(err))).catch(() => {});
},
},
{
baseUrl: runState.triggerApiUrl,
lastEventId: runState.lastEventId,
onLastEventId: (id) => {
runState.lastEventId = id;
},
},
);
return readable;
}
}
import { z } from "zod";
import { assistantChatUiMessageSchema } from "../schemas/chat-primitives";
// ...
/**
* Payload for `assistant-chat` Trigger task — keep aligned with chat-v2 `tasks.trigger` body.
*/
export const assistantChatTriggerPayloadSchema = z.object({
userId: z.string(),
// ... app-specific user/org fields
message: assistantChatUiMessageSchema.optional(),
messages: z.array(assistantChatUiMessageSchema).optional(),
// ... app-specific fields (capabilities, conversation context, etc.)
transport: z.enum(["trigger"]),
});
export type AssistantChatTriggerPayload = z.infer<typeof assistantChatTriggerPayloadSchema>;
/**
* Payload for `project-assistant-chat` Trigger task — same pattern with additional project fields.
*/
export const projectAssistantChatTriggerPayloadSchema = z.object({
userId: z.string(),
// ... app-specific user/org fields
projectId: z.number(),
message: assistantChatUiMessageSchema.optional(),
messages: z.array(assistantChatUiMessageSchema).optional(),
// ... app-specific fields (documentContext, capabilities, playbook, etc.)
transport: z.enum(["trigger"]),
});
export type ProjectAssistantChatTriggerPayload = z.infer<
typeof projectAssistantChatTriggerPayloadSchema
>;
/**
* Client transport: subscribe to Trigger.dev Realtime streams (ai, run_progress) using publicAccessToken.
* Uses SSE endpoint: GET /realtime/v1/streams/{runId}/{streamKey} with Authorization: Bearer {token}
* baseUrl is supplied by the server in the chat-v2 response (not public env).
*
* Decoding matches @trigger.dev/core SSEStreamSubscription: supports v1 (one JSON per event)
* and v2/S2 batch format (event: batch, data: {records: [{body: "{\"data\":...}"}]}).
*/
import type { UIMessageChunk } from "ai";
import { EventSourceParserStream } from "eventsource-parser/stream";
import type {
ClientStreamTransport,
ClientStreamTransportHandlers,
ClientStreamTransportSubscribeOptions,
} from "./types";
function safeParseJSON<T>(data: string): T | null {
try {
return JSON.parse(data) as T;
} catch {
return null;
}
}
const MAX_RETRIES = 5;
const RETRY_DELAY_MS = 1000;
interface ParseSSEStreamOptions {
lastEventId?: string;
onLastEventId?: (id: string) => void;
}
function isNetworkError(err: unknown): boolean {
if (!(err instanceof TypeError)) return false;
const msg = (err.message ?? "").toLowerCase();
return msg.includes("fetch") || msg.includes("network") || msg.includes("failed to fetch");
}
function parseSSEStream<T>(
baseUrl: string,
runId: string,
streamKey: string,
token: string,
onChunk: (chunk: T) => void,
signal: AbortSignal,
streamOptions?: ParseSSEStreamOptions,
): Promise<void> {
const url = `${baseUrl}/realtime/v1/streams/${runId}/${streamKey}`;
let lastEventId = streamOptions?.lastEventId;
const onLastEventId = streamOptions?.onLastEventId;
let retryCount = 0;
const connectStream = async (): Promise<void> => {
if (signal.aborted) return;
const headers: Record<string, string> = {
Accept: "text/event-stream",
Authorization: `Bearer ${token}`,
};
if (lastEventId) headers["Last-Event-ID"] = lastEventId;
const response = await fetch(url, { headers, signal });
if (!response.ok) {
const err = new Error(`Stream ${streamKey}: ${response.status}`);
if (response.status >= 400 && response.status < 500) throw err;
throw err;
}
if (!response.body) throw new Error("No response body");
retryCount = 0;
// This is ripped from trigger.devs internal read stream code.
// TODO: Ideally once we have a monorepo we can simply just import the stream and use .read()
const streamVersion = response.headers.get("X-Stream-Version") ?? "v1";
const seenIds = new Set<string>();
const stream = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream());
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const event = value as { event?: string; data?: string; id?: string };
const data = event.data;
if (data === undefined || data === null) continue;
if (streamVersion === "v1") {
if (event.id != null) {
lastEventId = event.id;
onLastEventId?.(event.id);
}
const parsed = safeParseJSON<T>(data);
if (parsed !== null) onChunk(parsed);
continue;
}
if (event.event === "batch") {
const envelope = safeParseJSON<{
records?: Array<{ body: string; seq_num: number; timestamp: number }>;
}>(data);
if (!envelope?.records) continue;
for (const record of envelope.records) {
if (record.seq_num != null) {
lastEventId = String(record.seq_num);
onLastEventId?.(lastEventId);
}
const parsedBody = safeParseJSON<{ data: T; id: string }>(record.body);
if (parsedBody === null) continue;
if (parsedBody.id != null && seenIds.has(parsedBody.id)) continue;
if (parsedBody.id != null) seenIds.add(parsedBody.id);
if (parsedBody.data !== undefined) onChunk(parsedBody.data);
}
}
}
} finally {
reader.releaseLock();
}
};
const runWithRetry = async (): Promise<void> => {
while (true) {
try {
await connectStream();
return;
} catch (err) {
if (signal.aborted) return;
if (!isNetworkError(err)) throw err;
if (retryCount >= MAX_RETRIES) throw err;
retryCount++;
const delay = RETRY_DELAY_MS * Math.pow(2, retryCount - 1);
await new Promise<void>((resolve) => setTimeout(resolve, delay));
if (signal.aborted) return;
}
}
};
return runWithRetry();
}
export class TriggerRealtimeClientTransport implements ClientStreamTransport {
subscribe(
runId: string,
token: string,
handlers: ClientStreamTransportHandlers,
options?: ClientStreamTransportSubscribeOptions,
): () => void {
const baseUrl = options?.baseUrl;
if (!baseUrl) {
handlers.onError(
new Error("Trigger API base URL required (provide triggerApiUrl from chat-v2 response)"),
);
return () => {};
}
const controller = new AbortController();
const signal = controller.signal;
const aiStreamOptions: ParseSSEStreamOptions | undefined =
options?.lastEventId !== undefined || options?.onLastEventId
? { lastEventId: options.lastEventId, onLastEventId: options.onLastEventId }
: undefined;
parseSSEStream<UIMessageChunk>(
baseUrl,
runId,
"ai",
token,
(chunk) => handlers.onAIChunk(chunk),
signal,
aiStreamOptions,
)
.then(() => handlers.onComplete())
.catch((err) => handlers.onError(err instanceof Error ? err : new Error(String(err))));
if (handlers.onRunProgress) {
parseSSEStream<{ event: string; data: unknown } | string>(
baseUrl,
runId,
"run_progress",
token,
(chunk) => {
const parsed =
typeof chunk === "string"
? safeParseJSON<{ event: string; data: unknown }>(chunk)
: chunk;
if (parsed?.event != null && typeof parsed.event === "string") {
handlers.onRunProgress?.(parsed.event, parsed.data);
}
},
signal,
).catch((err) => {
if (err?.name === "AbortError") return;
// eslint-disable-next-line no-console
console.warn(
`[TriggerTransport] run_progress stream error (non-fatal, run ${runId}):`,
err,
);
});
}
return () => controller.abort();
}
}
export function createTriggerRealtimeTransport(): ClientStreamTransport {
return new TriggerRealtimeClientTransport();
}
import type { UIMessageChunk } from "ai";
export interface ClientStreamTransportHandlers {
onAIChunk: (chunk: UIMessageChunk) => void;
/** Optional: run auxiliary progress (outline, content, export, etc.). Event name + payload. */
onRunProgress?: (event: string, data: unknown) => void;
onComplete: () => void;
onError: (error: Error) => void;
}
/** Options passed when subscribing (e.g. Trigger API base URL from server). */
export interface ClientStreamTransportSubscribeOptions {
/** Base URL for Trigger.dev Realtime API (required when using TriggerRealtimeClientTransport). */
baseUrl?: string;
/** Last event ID for SSE resumption (reconnect). Sent as Last-Event-ID header. */
lastEventId?: string;
/** Called with each event id (v1: event.id, v2: record.seq_num) so caller can persist for reconnect. */
onLastEventId?: (id: string) => void;
}
/** Client-side transport: subscribe to a run's AI stream (e.g. Trigger Realtime or Supabase Broadcast). */
export interface ClientStreamTransport {
subscribe(
runId: string,
token: string,
handlers: ClientStreamTransportHandlers,
options?: ClientStreamTransportSubscribeOptions,
): () => void;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment