Skip to content

Instantly share code, notes, and snippets.

@izakfilmalter
Created May 21, 2025 13:46
Show Gist options
  • Save izakfilmalter/7e85553287a5578e03864562d1524e15 to your computer and use it in GitHub Desktop.
Save izakfilmalter/7e85553287a5578e03864562d1524e15 to your computer and use it in GitHub Desktop.
streamText effect
import { chatsTable } from '@/server/db/schema/chatsSchema'
import { google } from '@ai-sdk/google'
import * as PgDrizzle from '@effect/sql-drizzle/Pg'
import {
generateText,
smoothStream,
streamText,
type LanguageModelUsage,
type ToolSet,
} from 'ai'
import { eq } from 'drizzle-orm'
import { Effect, pipe, Stream } from 'effect'
export const streamTextE = (
params: Parameters<typeof streamText>[0] & {
responseE: (params: {
content: string
state: 'generating' | 'complete'
isFirstChunk: boolean
}) => Effect.Effect<unknown, unknown, never>
toolE?: (content: string) => Effect.Effect<unknown, unknown, never>
usageE?: (
usage: LanguageModelUsage,
) => Effect.Effect<unknown, unknown, never>
errorE?: (error: unknown) => Effect.Effect<unknown, unknown, never>
},
) =>
Effect.gen(function* () {
const {
responseE,
usageE = () => Effect.succeed(null),
errorE = () => Effect.succeed(null),
toolE,
messages,
...rest
} = params
const result = streamText({
onError: async (error) => {
await errorE(error).pipe(Effect.runPromiseExit)
},
experimental_transform: smoothStream({
chunking: 'word',
}),
// toolCallStreaming: true,
messages,
...rest,
})
const { fullStream, usage } = result
return yield* Effect.all([
saveFullStreamE({
fullStream,
responseE,
errorE,
toolE,
}),
Effect.gen(function* () {
const localUsage = yield* Effect.tryPromise(async () => usage)
yield* usageE(localUsage)
}),
]).pipe(Effect.map(() => result))
})
export const generateTitleFromUserMessageE = (params: {
message: string
chatId: string
}) => {
const { message, chatId } = params
return Effect.gen(function* () {
const db = yield* PgDrizzle.PgDrizzle
const { text } = yield* Effect.tryPromise(async () =>
generateText({
model: google('gemini-2.0-flash'),
system: `
- You will generate a short title based on the first message a user begins a conversation with
- Ensure it is not more than 80 characters long
- The title should directly describe the topic or action in the user's message, not the fact that they're requesting something
- Begin with a noun, verb, or gerund (ing-form) rather than phrases like "Request for" or "Help with"
- Avoid using quotes, colons, or unnecessary articles (a, an, the) when possible
- Be concise but descriptive`,
prompt: message,
}),
)
yield* db
.update(chatsTable)
.set({ name: text })
.where(eq(chatsTable.id, chatId))
})
}
export const saveFullStreamE = <TOOLS extends ToolSet>(params: {
fullStream: ReturnType<typeof streamText<TOOLS>>['fullStream']
responseE: (params: {
content: string
state: 'generating' | 'complete'
isFirstChunk: boolean
}) => Effect.Effect<unknown, unknown, never>
toolE?: (content: string) => Effect.Effect<unknown, unknown, never>
errorE?: (error: unknown) => Effect.Effect<unknown, unknown, never>
}) =>
Effect.gen(function* () {
const {
fullStream,
responseE,
errorE = () => Effect.succeed(null),
toolE = () => Effect.succeed(null),
} = params
let content = ''
let isFirstChunk = true
yield* pipe(
Stream.fromAsyncIterable(fullStream, async (e) => {
await errorE(e).pipe(Effect.runPromise)
}),
Stream.runForEach((chunk) => {
// console.log('chunk', chunk)
if (chunk.type === 'tool-result') {
return toolE(JSON.stringify(chunk.result))
}
if (chunk.type === 'text-delta') {
content += chunk.textDelta
const result = responseE({
content,
state: 'generating',
isFirstChunk,
})
isFirstChunk = false
return result
}
return Effect.succeed(null)
}),
)
return responseE({ content, state: 'complete', isFirstChunk: false })
}).pipe(Effect.flatten)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment