Created
May 21, 2025 13:46
-
-
Save izakfilmalter/7e85553287a5578e03864562d1524e15 to your computer and use it in GitHub Desktop.
streamText effect
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { chatsTable } from '@/server/db/schema/chatsSchema' | |
import { google } from '@ai-sdk/google' | |
import * as PgDrizzle from '@effect/sql-drizzle/Pg' | |
import { | |
generateText, | |
smoothStream, | |
streamText, | |
type LanguageModelUsage, | |
type ToolSet, | |
} from 'ai' | |
import { eq } from 'drizzle-orm' | |
import { Effect, pipe, Stream } from 'effect' | |
export const streamTextE = ( | |
params: Parameters<typeof streamText>[0] & { | |
responseE: (params: { | |
content: string | |
state: 'generating' | 'complete' | |
isFirstChunk: boolean | |
}) => Effect.Effect<unknown, unknown, never> | |
toolE?: (content: string) => Effect.Effect<unknown, unknown, never> | |
usageE?: ( | |
usage: LanguageModelUsage, | |
) => Effect.Effect<unknown, unknown, never> | |
errorE?: (error: unknown) => Effect.Effect<unknown, unknown, never> | |
}, | |
) => | |
Effect.gen(function* () { | |
const { | |
responseE, | |
usageE = () => Effect.succeed(null), | |
errorE = () => Effect.succeed(null), | |
toolE, | |
messages, | |
...rest | |
} = params | |
const result = streamText({ | |
onError: async (error) => { | |
await errorE(error).pipe(Effect.runPromiseExit) | |
}, | |
experimental_transform: smoothStream({ | |
chunking: 'word', | |
}), | |
// toolCallStreaming: true, | |
messages, | |
...rest, | |
}) | |
const { fullStream, usage } = result | |
return yield* Effect.all([ | |
saveFullStreamE({ | |
fullStream, | |
responseE, | |
errorE, | |
toolE, | |
}), | |
Effect.gen(function* () { | |
const localUsage = yield* Effect.tryPromise(async () => usage) | |
yield* usageE(localUsage) | |
}), | |
]).pipe(Effect.map(() => result)) | |
}) | |
export const generateTitleFromUserMessageE = (params: { | |
message: string | |
chatId: string | |
}) => { | |
const { message, chatId } = params | |
return Effect.gen(function* () { | |
const db = yield* PgDrizzle.PgDrizzle | |
const { text } = yield* Effect.tryPromise(async () => | |
generateText({ | |
model: google('gemini-2.0-flash'), | |
system: ` | |
- You will generate a short title based on the first message a user begins a conversation with | |
- Ensure it is not more than 80 characters long | |
- The title should directly describe the topic or action in the user's message, not the fact that they're requesting something | |
- Begin with a noun, verb, or gerund (ing-form) rather than phrases like "Request for" or "Help with" | |
- Avoid using quotes, colons, or unnecessary articles (a, an, the) when possible | |
- Be concise but descriptive`, | |
prompt: message, | |
}), | |
) | |
yield* db | |
.update(chatsTable) | |
.set({ name: text }) | |
.where(eq(chatsTable.id, chatId)) | |
}) | |
} | |
export const saveFullStreamE = <TOOLS extends ToolSet>(params: { | |
fullStream: ReturnType<typeof streamText<TOOLS>>['fullStream'] | |
responseE: (params: { | |
content: string | |
state: 'generating' | 'complete' | |
isFirstChunk: boolean | |
}) => Effect.Effect<unknown, unknown, never> | |
toolE?: (content: string) => Effect.Effect<unknown, unknown, never> | |
errorE?: (error: unknown) => Effect.Effect<unknown, unknown, never> | |
}) => | |
Effect.gen(function* () { | |
const { | |
fullStream, | |
responseE, | |
errorE = () => Effect.succeed(null), | |
toolE = () => Effect.succeed(null), | |
} = params | |
let content = '' | |
let isFirstChunk = true | |
yield* pipe( | |
Stream.fromAsyncIterable(fullStream, async (e) => { | |
await errorE(e).pipe(Effect.runPromise) | |
}), | |
Stream.runForEach((chunk) => { | |
// console.log('chunk', chunk) | |
if (chunk.type === 'tool-result') { | |
return toolE(JSON.stringify(chunk.result)) | |
} | |
if (chunk.type === 'text-delta') { | |
content += chunk.textDelta | |
const result = responseE({ | |
content, | |
state: 'generating', | |
isFirstChunk, | |
}) | |
isFirstChunk = false | |
return result | |
} | |
return Effect.succeed(null) | |
}), | |
) | |
return responseE({ content, state: 'complete', isFirstChunk: false }) | |
}).pipe(Effect.flatten) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment