Created
March 27, 2026 18:05
-
-
Save ConProgramming/6f9b46847f9c0cecef668b61b6cb5ffe to your computer and use it in GitHub Desktop.
AI SDK + Trigger.dev Realtime interop — custom ChatTransport, SSE streaming, stream piping
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { streams } from "@trigger.dev/sdk"; | |
| import type { UIMessageChunk } from "ai"; | |
| export const aiStream = streams.define<UIMessageChunk>({ id: "ai" }); | |
| export const runProgressStream = streams.define<string>({ id: "run_progress" }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { schemaTask, logger, tags } from "@trigger.dev/sdk"; | |
| import { streamText, convertToModelMessages, validateUIMessages, stepCountIs } from "ai"; | |
| import type { ToolSet, UIMessage, UIMessageChunk } from "ai"; | |
| import { aiStream } from "./streams"; | |
| import { createRunProgressTransport } from "./run-progress-transport"; | |
| // ... | |
| export const assistantChat = schemaTask({ | |
| id: "assistant-chat", | |
| retry: { maxAttempts: 1 }, | |
| schema: assistantChatTriggerPayloadSchema, | |
| init: async () => { | |
| // ... initialize DB clients, AI provider, etc. | |
| return { SB, AI, PDB }; | |
| }, | |
| run: async (payload, { ctx, init }) => { | |
| const { SB, AI, PDB } = init!; | |
| // ... destructure payload, load/create conversation, store user message, load history | |
| const runProgressTransport = createRunProgressTransport(transport, ctx.run.id, SB); | |
| // ... build tools, system prompt, validate/convert messages | |
| const result = streamText({ | |
| model: AI.languageModel("thinking"), | |
| system: finalSystemPrompt, | |
| messages: modelMessages, | |
| tools: toolsWithCodeMode, | |
| stopWhen: stepCountIs(stepLimit), | |
| experimental_telemetry: { isEnabled: true, recordInputs: true, recordOutputs: true }, | |
| }); | |
| const stream = result.toUIMessageStream({ | |
| originalMessages: allMessages, | |
| onFinish: async ({ responseMessage }) => { | |
| // ... save response to DB | |
| }, | |
| }); | |
| // For new conversations, prepend a data chunk; title is sent via run_progress (no AI stream delay). | |
| const isNewConversation = currentConversationId == null || currentConversationId === ""; | |
| const titlePromise = isNewConversation | |
| ? (async () => { | |
| // ... generate title and send via runProgressTransport | |
| await runProgressTransport.send("chat-title", { chatId: String(questionId), title }); | |
| })() | |
| : Promise.resolve(); | |
| const streamToPipe = isNewConversation | |
| ? (async function* (): AsyncGenerator<UIMessageChunk> { | |
| yield { | |
| type: "data-conversation-created", | |
| data: { id: String(questionId) }, | |
| } as UIMessageChunk; | |
| for await (const chunk of stream) { | |
| yield chunk; | |
| } | |
| })() | |
| : stream; | |
| const { waitUntilComplete } = aiStream.pipe(streamToPipe); | |
| try { | |
| await logger.trace("ai-stream", async () => { | |
| await waitUntilComplete(); | |
| }); | |
| } finally { | |
| await runProgressTransport.unsubscribe(); | |
| } | |
| await titlePromise; | |
| // ... | |
| }, | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { getTransport } from "$lib/ai/realtime/transport-utils"; | |
| import { triggerAssistantTask, triggerChatResponseJson } from "$lib/ai/trigger-chat-bridge"; | |
| // ... | |
| export const POST: RequestHandler = async (event) => { | |
| // ... auth, rate limiting, request validation | |
| const transport = getTransport(org.meta); | |
| const payload = { | |
| userId: user.id, | |
| // ... app-specific fields | |
| message, | |
| messages, | |
| transport, | |
| }; | |
| const result = await triggerAssistantTask({ | |
| taskId: "assistant-chat", | |
| payload, | |
| transport, | |
| }); | |
| return triggerChatResponseJson(result); | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <script lang="ts"> | |
| // ... | |
| import { Chat } from "@ai-sdk/svelte"; | |
| import { | |
| DefaultChatTransport, | |
| lastAssistantMessageIsCompleteWithToolCalls, | |
| type ChatTransport, | |
| type UIMessage, | |
| } from "ai"; | |
| // ... | |
| import { TriggerDevChatTransport } from "$lib/ai/realtime/trigger-chat-transport"; | |
| import { createTriggerRealtimeTransport } from "$lib/ai/realtime/trigger-transport"; | |
| // ... | |
| // Read transport mode from org feature flag | |
| const assistantTransport = $derived( | |
| (flags?.legacy?.meta as { assistant_transport?: string }) | |
| ?.assistant_transport ?? "legacy", | |
| ); | |
| const useTriggerAgent = $derived(assistantTransport === "trigger"); | |
| const chatV2ApiUrl = $derived( | |
| `/projects-v3/${page.params.id}/api/assistant/chat-v2`, | |
| ); | |
| // Create the Trigger Realtime client stream transport | |
| const triggerTransport = $derived( | |
| useTriggerAgent ? createTriggerRealtimeTransport() : null, | |
| ); | |
| // ... | |
| // Wrap in TriggerDevChatTransport (posts to chat-v2, subscribes to Trigger Realtime) | |
| const triggerTransportOrNull = $derived( | |
| useTriggerAgent && triggerTransport | |
| ? new TriggerDevChatTransport({ | |
| apiUrl: chatV2ApiUrl, | |
| clientTransport: triggerTransport, | |
| body: getAskModeBody, | |
| }) | |
| : null, | |
| ); | |
| // ... additional TriggerDevChatTransport instances for outline/content modes | |
| // with runProgressCallbacks for onOutlineGenerationProgress / onContentGenerationProgress | |
| // Delegating transport: switches between Trigger and legacy at runtime | |
| const getTransport = () => triggerTransportOrNull ?? defaultTransport; | |
| const delegatingTransport: ChatTransport<UIMessage> = { | |
| sendMessages: (opts) => getTransport().sendMessages(opts), | |
| reconnectToStream: (opts) => getTransport().reconnectToStream(opts), | |
| }; | |
| // ... | |
| // Chat instance uses the delegating transport | |
| instance = new Chat({ | |
| transport: delegatingTransport, | |
| sendAutomaticallyWhen: (context) => { | |
| return lastAssistantMessageIsCompleteWithToolCalls(context); | |
| }, | |
| // ... | |
| }); | |
| // ... | |
| </script> | |
| <!-- ... component template --> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Cancel an assistant run (Trigger.dev run). | |
| */ | |
| import { error, json } from "@sveltejs/kit"; | |
| import { requireRateLimit } from "$lib/rate-limit/middleware"; | |
| import { runs } from "@trigger.dev/sdk"; | |
| import type { RequestHandler } from "./$types"; | |
| export const POST: RequestHandler = async (event) => { | |
| const { | |
| request, | |
| locals: { getUser }, | |
| } = event; | |
| const user = await getUser(); | |
| if (!user) return error(401, "Unauthorized"); | |
| await requireRateLimit(event, "MUTATION", `mutation:user:${user.id}`); | |
| const body = await request.json(); | |
| const runId = body?.runId; | |
| if (typeof runId !== "string" || runId.length === 0) return error(400, "Missing runId"); | |
| let run; | |
| try { | |
| // Verify the authenticated user owns this run | |
| run = await runs.retrieve(runId); | |
| } catch { | |
| // Run may be stale, already deleted, or invalid | |
| return json({ ok: true }); | |
| } | |
| const payload = run.payload as Record<string, unknown> | undefined; | |
| if (payload?.userId !== user.id) return error(403, "Forbidden"); | |
| try { | |
| await runs.cancel(runId); | |
| } catch { | |
| // Run may have completed or been cancelled before we could cancel it | |
| } | |
| return json({ ok: true }); | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** Accumulates outline deltas into full-state payloads for the Svelte store. */ | |
| export class OutlineProgressAccumulator { | |
| // ... internal state | |
| /** Apply a delta payload, return the full-state payload to forward. */ | |
| apply(data: Record<string, unknown>) { | |
| // ... accumulate deltas into full state, return merged payload | |
| } | |
| // ... | |
| } | |
| /** Accumulates content section deltas into full-state payloads for the Svelte store. */ | |
| export class ContentProgressAccumulator { | |
| // ... internal state | |
| apply(data: Record<string, unknown>) { | |
| // ... accumulate section deltas into full state, return merged payload | |
| } | |
| // ... | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Trigger.dev Realtime Stream definitions for AI agent | |
| * Requires @trigger.dev/sdk 4.1.0+ | |
| */ | |
| import { streams, type InferStreamType } from "@trigger.dev/sdk"; | |
| import type { UIMessageChunk } from "ai"; | |
| export const aiStream = streams.define<UIMessageChunk>({ id: "ai" }); | |
| /** Auxiliary stream for run progress (outline, content, export, etc.). Chunk: JSON string of { event, data } for readable dashboard. */ | |
| export const runProgressStream = streams.define<string>({ id: "run_progress" }); | |
| export type AIStreamPart = InferStreamType<typeof aiStream>; | |
| export type RunProgressPart = InferStreamType<typeof runProgressStream>; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { getTransport } from "$lib/ai/realtime/transport-utils"; | |
| import { triggerAssistantTask, triggerChatResponseJson } from "$lib/ai/trigger-chat-bridge"; | |
| // ... | |
| export const POST: RequestHandler = async (event) => { | |
| // ... auth, rate limiting, request validation | |
| const projectId = parseInt(params.id); | |
| // ... validate project ID | |
| const transport = getTransport(org.meta); | |
| const payload = { | |
| userId: user.id, | |
| projectId, | |
| // ... app-specific fields (documentContext, capabilities, etc.) | |
| message, | |
| messages, | |
| transport, | |
| }; | |
| const result = await triggerAssistantTask({ | |
| taskId: "project-assistant-chat", | |
| payload, | |
| transport, | |
| }); | |
| // ... optional activity logging | |
| return triggerChatResponseJson(result); | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Run progress transport: stream auxiliary events (outline, content, export, etc.) to the client | |
| * via the Trigger Realtime run_progress stream. | |
| */ | |
| import { runProgressStream } from "../streams"; | |
| export interface RunProgressTransport { | |
| send(event: string, data: unknown): Promise<void>; | |
| unsubscribe(): Promise<void>; | |
| } | |
| /** Append to Trigger run_progress stream. */ | |
| class TriggerStreamRunProgressTransport implements RunProgressTransport { | |
| async send(event: string, data: unknown): Promise<void> { | |
| await runProgressStream.append(JSON.stringify({ event, data })); | |
| } | |
| async unsubscribe(): Promise<void> {} | |
| } | |
| export function createRunProgressTransport(): RunProgressTransport { | |
| return new TriggerStreamRunProgressTransport(); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** Valid transport values for assistant chat endpoints. */ | |
| export type TransportValue = "trigger"; | |
| /** Read the transport mode from org metadata, defaulting to "trigger". */ | |
| export function getTransport(meta: Record<string, unknown> | null | undefined): TransportValue { | |
| return "trigger"; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Shared Trigger.dev bridge for assistant chat-v2 routes (validate body → trigger → JSON response). | |
| */ | |
| import { tasks } from "@trigger.dev/sdk"; | |
| import { json } from "@sveltejs/kit"; | |
| import { TRIGGER_DEV_API_URL } from "$lib/env/private.server"; | |
| import type { TransportValue } from "$lib/ai/realtime/transport-utils"; | |
| export type TriggerChatBridgeResult = { | |
| runId: string; | |
| publicAccessToken: string; | |
| transport: TransportValue; | |
| triggerApiUrl: string; | |
| }; | |
| export const triggerChatResponseJson = (result: TriggerChatBridgeResult) => | |
| json({ | |
| runId: result.runId, | |
| publicAccessToken: result.publicAccessToken, | |
| transport: result.transport, | |
| triggerApiUrl: TRIGGER_DEV_API_URL, | |
| }); | |
| export const triggerAssistantTask = async ({ | |
| taskId, | |
| payload, | |
| transport, | |
| }: { | |
| taskId: "assistant-chat" | "project-assistant-chat"; | |
| payload: Record<string, unknown>; | |
| transport: TransportValue; | |
| }): Promise<TriggerChatBridgeResult> => { | |
| const handle = await tasks.trigger(taskId, payload); | |
| return { | |
| runId: handle.id, | |
| publicAccessToken: handle.publicAccessToken, | |
| transport, | |
| triggerApiUrl: TRIGGER_DEV_API_URL, | |
| }; | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * ChatTransport implementation for Trigger.dev: POST to chat-v2, then consume | |
| * the run's AI stream via ClientStreamTransport. | |
| * Returns a ReadableStream<UIMessageChunk> so the Chat class handles message building. | |
| * Browser tools are executed by the Chat class via onToolCall (stateless chat pattern). | |
| */ | |
| import type { ChatTransport } from "ai"; | |
| import type { UIMessage, UIMessageChunk } from "ai"; | |
| import type { ClientStreamTransport } from "./types"; | |
| import { OutlineProgressAccumulator, ContentProgressAccumulator } from "./progress-accumulators"; | |
| const TAG = "[TriggerTransport]"; | |
| interface RunState { | |
| runId: string; | |
| publicAccessToken: string; | |
| triggerApiUrl: string; | |
| lastEventId?: string; | |
| } | |
| /** Optional callbacks for run progress (outline, content generation, chat title). */ | |
| export interface TriggerRunProgressCallbacks { | |
| onOutlineGenerationProgress?: (data: unknown) => void; | |
| onContentGenerationProgress?: (data: unknown) => void; | |
| onChatTitle?: (data: { chatId: string; title: string }) => void; | |
| } | |
| /** A value or a (possibly async) function that returns a value — mirrors the SDK's Resolvable pattern. */ | |
| type Resolvable<T> = T | (() => T | Promise<T>); | |
| async function resolve<T>(value: Resolvable<T> | undefined): Promise<T | undefined> { | |
| if (typeof value === "function") { | |
| return await (value as () => T | Promise<T>)(); | |
| } | |
| return value; | |
| } | |
| export interface TriggerDevChatTransportOptions { | |
| /** Chat-v2 endpoint URL (e.g. /assistant-v3/api/chat-v2). */ | |
| apiUrl: string; | |
| /** Client transport to subscribe to the run (Trigger Realtime or Supabase Broadcast). */ | |
| clientTransport: ClientStreamTransport; | |
| /** Optional: run progress callbacks for outline/content generation. */ | |
| runProgressCallbacks?: TriggerRunProgressCallbacks; | |
| /** | |
| * Extra body properties merged into every POST to chat-v2. | |
| * Can be a static object or a (possibly async) function evaluated at call time. | |
| * Matches the `body` option on DefaultChatTransport / HttpChatTransport. | |
| */ | |
| body?: Resolvable<object>; | |
| } | |
| export class TriggerDevChatTransport< | |
| UI_MESSAGE extends UIMessage = UIMessage, | |
| > implements ChatTransport<UI_MESSAGE> { | |
| private readonly apiUrl: string; | |
| private readonly clientTransport: ClientStreamTransport; | |
| private readonly runProgressCallbacks: TriggerRunProgressCallbacks | undefined; | |
| private readonly body: Resolvable<object> | undefined; | |
| private readonly runStateByChatId = new Map<string, RunState>(); | |
| constructor(options: TriggerDevChatTransportOptions) { | |
| this.apiUrl = options.apiUrl; | |
| this.clientTransport = options.clientTransport; | |
| this.runProgressCallbacks = options.runProgressCallbacks; | |
| this.body = options.body; | |
| } | |
| async sendMessages( | |
| options: Parameters<ChatTransport<UI_MESSAGE>["sendMessages"]>[0], | |
| ): Promise<ReadableStream<UIMessageChunk>> { | |
| const { messages, abortSignal, body: perRequestBody, chatId } = options; | |
| const lastUserMessage = [...messages].filter((m) => m.role === "user").at(-1); | |
| if (!lastUserMessage) { | |
| throw new Error("No user message to send"); | |
| } | |
| // Continuation: last message is assistant with tool results — send all messages so chat-v2 can save them | |
| const lastMsg = messages[messages.length - 1]; | |
| const isContinuation = | |
| lastMsg?.role === "assistant" && | |
| lastMsg.parts?.some( | |
| (p: { type?: string; output?: unknown }) => | |
| p.type?.startsWith?.("tool-") && (p as { output?: unknown }).output !== undefined, | |
| ); | |
| const resolvedBody = (await resolve(this.body)) ?? {}; | |
| const requestBody = { | |
| ...resolvedBody, | |
| ...perRequestBody, | |
| ...(isContinuation ? { messages } : { message: lastUserMessage }), | |
| }; | |
| console.log(TAG, `POST ${this.apiUrl}`, { bodyKeys: Object.keys(requestBody), isContinuation }); | |
| const res = await fetch(this.apiUrl, { | |
| method: "POST", | |
| headers: { "Content-Type": "application/json" }, | |
| body: JSON.stringify(requestBody), | |
| signal: abortSignal ?? undefined, | |
| }); | |
| if (!res.ok) { | |
| const errorText = await res.text().catch(() => ""); | |
| let detail = errorText; | |
| try { | |
| const json = JSON.parse(errorText) as { message?: string; issues?: unknown[] }; | |
| if (json.issues?.length) { | |
| console.error(TAG, `POST ${this.apiUrl} failed: ${res.status}`, json); | |
| detail = json.message + " " + JSON.stringify(json.issues); | |
| } else { | |
| console.error(TAG, `POST ${this.apiUrl} failed: ${res.status}`, errorText); | |
| } | |
| } catch { | |
| console.error(TAG, `POST ${this.apiUrl} failed: ${res.status}`, errorText); | |
| } | |
| throw new Error(`Chat request failed: ${res.status} - ${detail}`); | |
| } | |
| const { runId, publicAccessToken, transport, triggerApiUrl } = (await res.json()) as { | |
| runId: string; | |
| publicAccessToken: string; | |
| transport?: string; | |
| triggerApiUrl?: string; | |
| }; | |
| console.log(TAG, `Run started: ${runId}`, { transport }); | |
| if (!triggerApiUrl) { | |
| throw new Error("Trigger API URL is required"); | |
| } | |
| const runState: RunState = { runId, publicAccessToken, triggerApiUrl }; | |
| if (chatId != null) { | |
| this.runStateByChatId.set(chatId, runState); | |
| } | |
| const { readable, writable } = new TransformStream<UIMessageChunk, UIMessageChunk>(); | |
| const writer = writable.getWriter(); | |
| let unsubscribe: (() => void) | null = null; | |
| let completed = false; | |
| const cleanup = () => { | |
| unsubscribe?.(); | |
| unsubscribe = null; | |
| }; | |
| /** Close the writer and abort the SSE subscription. Safe to call multiple times. */ | |
| const finishStream = () => { | |
| if (completed) return; | |
| completed = true; | |
| if (chatId != null) this.runStateByChatId.delete(chatId); | |
| console.log(TAG, `[TIMING] Stream finished ts=${Date.now()} (run ${runId})`); | |
| cleanup(); | |
| writer.close().catch(() => {}); | |
| }; | |
| if (abortSignal?.aborted) { | |
| writer.abort(abortSignal.reason); | |
| return readable; | |
| } | |
| abortSignal?.addEventListener( | |
| "abort", | |
| () => { | |
| if (completed) return; | |
| console.log(TAG, `Aborted (run ${runId})`); | |
| cleanup(); | |
| writer.abort(abortSignal.reason).catch(() => {}); | |
| fetch("/api/assistant/cancel", { | |
| method: "POST", | |
| headers: { "Content-Type": "application/json" }, | |
| body: JSON.stringify({ runId }), | |
| }).catch(() => {}); | |
| }, | |
| { once: true }, | |
| ); | |
| const outlineAcc = new OutlineProgressAccumulator(); | |
| const contentAcc = new ContentProgressAccumulator(); | |
| const cbs = this.runProgressCallbacks; | |
| unsubscribe = this.clientTransport.subscribe( | |
| runId, | |
| publicAccessToken, | |
| { | |
| onAIChunk: (chunk) => { | |
| if (abortSignal?.aborted || completed) return; | |
| writer.write(chunk).catch(() => {}); | |
| // Detect the AI SDK's "finish" chunk — the last meaningful chunk in | |
| // the UIMessageStream. Close immediately instead of waiting for | |
| // Trigger's 60s SSE idle timeout to close the HTTP connection. | |
| // Worth noting this timeout exists to handle reconnects | |
| if ((chunk as { type?: string }).type === "finish") { | |
| finishStream(); | |
| } | |
| }, | |
| onRunProgress: (event: string, data: unknown) => { | |
| if (event === "chat-title") { | |
| cbs?.onChatTitle?.(data as { chatId: string; title: string }); | |
| return; | |
| } | |
| const d = data as Record<string, unknown>; | |
| if (event === "outline" && cbs?.onOutlineGenerationProgress) { | |
| cbs.onOutlineGenerationProgress(outlineAcc.apply(d)); | |
| } else if (event === "content" && cbs?.onContentGenerationProgress) { | |
| cbs.onContentGenerationProgress(contentAcc.apply(d)); | |
| } | |
| }, | |
| onComplete: () => { | |
| // Fallback: SSE connection closed by Trigger (normally 60s idle timeout). | |
| // Usually a no-op because finishStream() already ran on the "finish" chunk. | |
| finishStream(); | |
| }, | |
| onError: (err) => { | |
| const isAbort = err?.name === "AbortError" || err?.message?.includes("aborted"); | |
| if (isAbort || completed) { | |
| // Expected: SSE fetch was aborted after we called finishStream(). | |
| cleanup(); | |
| return; | |
| } | |
| console.error(TAG, `Stream error (run ${runId}):`, err); | |
| cleanup(); | |
| writer.abort(err).catch(() => {}); | |
| }, | |
| }, | |
| { | |
| baseUrl: triggerApiUrl, | |
| onLastEventId: (id) => { | |
| runState.lastEventId = id; | |
| }, | |
| }, | |
| ); | |
| return readable; | |
| } | |
| async reconnectToStream( | |
| options: Parameters<ChatTransport<UI_MESSAGE>["reconnectToStream"]>[0], | |
| ): Promise<ReadableStream<UIMessageChunk> | null> { | |
| const chatId = (options as { chatId?: string }).chatId; | |
| if (chatId == null) return null; | |
| const runState = this.runStateByChatId.get(chatId); | |
| if ( | |
| !runState || | |
| runState.runId == null || | |
| runState.publicAccessToken == null || | |
| runState.triggerApiUrl == null | |
| ) { | |
| return null; | |
| } | |
| const { readable, writable } = new TransformStream<UIMessageChunk, UIMessageChunk>(); | |
| const writer = writable.getWriter(); | |
| let unsubscribe: (() => void) | null = null; | |
| let completed = false; | |
| const finishStream = () => { | |
| if (completed) return; | |
| completed = true; | |
| this.runStateByChatId.delete(chatId); | |
| unsubscribe?.(); | |
| unsubscribe = null; | |
| writer.close().catch(() => {}); | |
| }; | |
| unsubscribe = this.clientTransport.subscribe( | |
| runState.runId, | |
| runState.publicAccessToken, | |
| { | |
| onAIChunk: (chunk) => { | |
| if (completed) return; | |
| writer.write(chunk).catch(() => {}); | |
| if ((chunk as { type?: string }).type === "finish") { | |
| finishStream(); | |
| } | |
| }, | |
| onRunProgress: () => {}, | |
| onComplete: finishStream, | |
| onError: (err) => { | |
| if (completed) return; | |
| unsubscribe?.(); | |
| unsubscribe = null; | |
| writer.abort(err instanceof Error ? err : new Error(String(err))).catch(() => {}); | |
| }, | |
| }, | |
| { | |
| baseUrl: runState.triggerApiUrl, | |
| lastEventId: runState.lastEventId, | |
| onLastEventId: (id) => { | |
| runState.lastEventId = id; | |
| }, | |
| }, | |
| ); | |
| return readable; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { z } from "zod"; | |
| import { assistantChatUiMessageSchema } from "../schemas/chat-primitives"; | |
| // ... | |
| /** | |
| * Payload for `assistant-chat` Trigger task — keep aligned with chat-v2 `tasks.trigger` body. | |
| */ | |
| export const assistantChatTriggerPayloadSchema = z.object({ | |
| userId: z.string(), | |
| // ... app-specific user/org fields | |
| message: assistantChatUiMessageSchema.optional(), | |
| messages: z.array(assistantChatUiMessageSchema).optional(), | |
| // ... app-specific fields (capabilities, conversation context, etc.) | |
| transport: z.enum(["trigger"]), | |
| }); | |
| export type AssistantChatTriggerPayload = z.infer<typeof assistantChatTriggerPayloadSchema>; | |
| /** | |
| * Payload for `project-assistant-chat` Trigger task — same pattern with additional project fields. | |
| */ | |
| export const projectAssistantChatTriggerPayloadSchema = z.object({ | |
| userId: z.string(), | |
| // ... app-specific user/org fields | |
| projectId: z.number(), | |
| message: assistantChatUiMessageSchema.optional(), | |
| messages: z.array(assistantChatUiMessageSchema).optional(), | |
| // ... app-specific fields (documentContext, capabilities, playbook, etc.) | |
| transport: z.enum(["trigger"]), | |
| }); | |
| export type ProjectAssistantChatTriggerPayload = z.infer< | |
| typeof projectAssistantChatTriggerPayloadSchema | |
| >; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Client transport: subscribe to Trigger.dev Realtime streams (ai, run_progress) using publicAccessToken. | |
| * Uses SSE endpoint: GET /realtime/v1/streams/{runId}/{streamKey} with Authorization: Bearer {token} | |
| * baseUrl is supplied by the server in the chat-v2 response (not public env). | |
| * | |
| * Decoding matches @trigger.dev/core SSEStreamSubscription: supports v1 (one JSON per event) | |
| * and v2/S2 batch format (event: batch, data: {records: [{body: "{\"data\":...}"}]}). | |
| */ | |
| import type { UIMessageChunk } from "ai"; | |
| import { EventSourceParserStream } from "eventsource-parser/stream"; | |
| import type { | |
| ClientStreamTransport, | |
| ClientStreamTransportHandlers, | |
| ClientStreamTransportSubscribeOptions, | |
| } from "./types"; | |
| function safeParseJSON<T>(data: string): T | null { | |
| try { | |
| return JSON.parse(data) as T; | |
| } catch { | |
| return null; | |
| } | |
| } | |
| const MAX_RETRIES = 5; | |
| const RETRY_DELAY_MS = 1000; | |
| interface ParseSSEStreamOptions { | |
| lastEventId?: string; | |
| onLastEventId?: (id: string) => void; | |
| } | |
| function isNetworkError(err: unknown): boolean { | |
| if (!(err instanceof TypeError)) return false; | |
| const msg = (err.message ?? "").toLowerCase(); | |
| return msg.includes("fetch") || msg.includes("network") || msg.includes("failed to fetch"); | |
| } | |
| function parseSSEStream<T>( | |
| baseUrl: string, | |
| runId: string, | |
| streamKey: string, | |
| token: string, | |
| onChunk: (chunk: T) => void, | |
| signal: AbortSignal, | |
| streamOptions?: ParseSSEStreamOptions, | |
| ): Promise<void> { | |
| const url = `${baseUrl}/realtime/v1/streams/${runId}/${streamKey}`; | |
| let lastEventId = streamOptions?.lastEventId; | |
| const onLastEventId = streamOptions?.onLastEventId; | |
| let retryCount = 0; | |
| const connectStream = async (): Promise<void> => { | |
| if (signal.aborted) return; | |
| const headers: Record<string, string> = { | |
| Accept: "text/event-stream", | |
| Authorization: `Bearer ${token}`, | |
| }; | |
| if (lastEventId) headers["Last-Event-ID"] = lastEventId; | |
| const response = await fetch(url, { headers, signal }); | |
| if (!response.ok) { | |
| const err = new Error(`Stream ${streamKey}: ${response.status}`); | |
| if (response.status >= 400 && response.status < 500) throw err; | |
| throw err; | |
| } | |
| if (!response.body) throw new Error("No response body"); | |
| retryCount = 0; | |
| // This is ripped from trigger.devs internal read stream code. | |
| // TODO: Ideally once we have a monorepo we can simply just import the stream and use .read() | |
| const streamVersion = response.headers.get("X-Stream-Version") ?? "v1"; | |
| const seenIds = new Set<string>(); | |
| const stream = response.body | |
| .pipeThrough(new TextDecoderStream()) | |
| .pipeThrough(new EventSourceParserStream()); | |
| const reader = stream.getReader(); | |
| try { | |
| while (true) { | |
| const { done, value } = await reader.read(); | |
| if (done) break; | |
| const event = value as { event?: string; data?: string; id?: string }; | |
| const data = event.data; | |
| if (data === undefined || data === null) continue; | |
| if (streamVersion === "v1") { | |
| if (event.id != null) { | |
| lastEventId = event.id; | |
| onLastEventId?.(event.id); | |
| } | |
| const parsed = safeParseJSON<T>(data); | |
| if (parsed !== null) onChunk(parsed); | |
| continue; | |
| } | |
| if (event.event === "batch") { | |
| const envelope = safeParseJSON<{ | |
| records?: Array<{ body: string; seq_num: number; timestamp: number }>; | |
| }>(data); | |
| if (!envelope?.records) continue; | |
| for (const record of envelope.records) { | |
| if (record.seq_num != null) { | |
| lastEventId = String(record.seq_num); | |
| onLastEventId?.(lastEventId); | |
| } | |
| const parsedBody = safeParseJSON<{ data: T; id: string }>(record.body); | |
| if (parsedBody === null) continue; | |
| if (parsedBody.id != null && seenIds.has(parsedBody.id)) continue; | |
| if (parsedBody.id != null) seenIds.add(parsedBody.id); | |
| if (parsedBody.data !== undefined) onChunk(parsedBody.data); | |
| } | |
| } | |
| } | |
| } finally { | |
| reader.releaseLock(); | |
| } | |
| }; | |
| const runWithRetry = async (): Promise<void> => { | |
| while (true) { | |
| try { | |
| await connectStream(); | |
| return; | |
| } catch (err) { | |
| if (signal.aborted) return; | |
| if (!isNetworkError(err)) throw err; | |
| if (retryCount >= MAX_RETRIES) throw err; | |
| retryCount++; | |
| const delay = RETRY_DELAY_MS * Math.pow(2, retryCount - 1); | |
| await new Promise<void>((resolve) => setTimeout(resolve, delay)); | |
| if (signal.aborted) return; | |
| } | |
| } | |
| }; | |
| return runWithRetry(); | |
| } | |
| export class TriggerRealtimeClientTransport implements ClientStreamTransport { | |
| subscribe( | |
| runId: string, | |
| token: string, | |
| handlers: ClientStreamTransportHandlers, | |
| options?: ClientStreamTransportSubscribeOptions, | |
| ): () => void { | |
| const baseUrl = options?.baseUrl; | |
| if (!baseUrl) { | |
| handlers.onError( | |
| new Error("Trigger API base URL required (provide triggerApiUrl from chat-v2 response)"), | |
| ); | |
| return () => {}; | |
| } | |
| const controller = new AbortController(); | |
| const signal = controller.signal; | |
| const aiStreamOptions: ParseSSEStreamOptions | undefined = | |
| options?.lastEventId !== undefined || options?.onLastEventId | |
| ? { lastEventId: options.lastEventId, onLastEventId: options.onLastEventId } | |
| : undefined; | |
| parseSSEStream<UIMessageChunk>( | |
| baseUrl, | |
| runId, | |
| "ai", | |
| token, | |
| (chunk) => handlers.onAIChunk(chunk), | |
| signal, | |
| aiStreamOptions, | |
| ) | |
| .then(() => handlers.onComplete()) | |
| .catch((err) => handlers.onError(err instanceof Error ? err : new Error(String(err)))); | |
| if (handlers.onRunProgress) { | |
| parseSSEStream<{ event: string; data: unknown } | string>( | |
| baseUrl, | |
| runId, | |
| "run_progress", | |
| token, | |
| (chunk) => { | |
| const parsed = | |
| typeof chunk === "string" | |
| ? safeParseJSON<{ event: string; data: unknown }>(chunk) | |
| : chunk; | |
| if (parsed?.event != null && typeof parsed.event === "string") { | |
| handlers.onRunProgress?.(parsed.event, parsed.data); | |
| } | |
| }, | |
| signal, | |
| ).catch((err) => { | |
| if (err?.name === "AbortError") return; | |
| // eslint-disable-next-line no-console | |
| console.warn( | |
| `[TriggerTransport] run_progress stream error (non-fatal, run ${runId}):`, | |
| err, | |
| ); | |
| }); | |
| } | |
| return () => controller.abort(); | |
| } | |
| } | |
| export function createTriggerRealtimeTransport(): ClientStreamTransport { | |
| return new TriggerRealtimeClientTransport(); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import type { UIMessageChunk } from "ai"; | |
| export interface ClientStreamTransportHandlers { | |
| onAIChunk: (chunk: UIMessageChunk) => void; | |
| /** Optional: run auxiliary progress (outline, content, export, etc.). Event name + payload. */ | |
| onRunProgress?: (event: string, data: unknown) => void; | |
| onComplete: () => void; | |
| onError: (error: Error) => void; | |
| } | |
| /** Options passed when subscribing (e.g. Trigger API base URL from server). */ | |
| export interface ClientStreamTransportSubscribeOptions { | |
| /** Base URL for Trigger.dev Realtime API (required when using TriggerRealtimeClientTransport). */ | |
| baseUrl?: string; | |
| /** Last event ID for SSE resumption (reconnect). Sent as Last-Event-ID header. */ | |
| lastEventId?: string; | |
| /** Called with each event id (v1: event.id, v2: record.seq_num) so caller can persist for reconnect. */ | |
| onLastEventId?: (id: string) => void; | |
| } | |
| /** Client-side transport: subscribe to a run's AI stream (e.g. Trigger Realtime or Supabase Broadcast). */ | |
| export interface ClientStreamTransport { | |
| subscribe( | |
| runId: string, | |
| token: string, | |
| handlers: ClientStreamTransportHandlers, | |
| options?: ClientStreamTransportSubscribeOptions, | |
| ): () => void; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment