Skip to content

Instantly share code, notes, and snippets.

@evelant
Created March 22, 2025 21:13
Show Gist options
  • Save evelant/ac2ccdf517bbadc671dec8ea63792695 to your computer and use it in GitHub Desktop.
Save evelant/ac2ccdf517bbadc671dec8ea63792695 to your computer and use it in GitHub Desktop.
/**
* @since 1.0.0
*/
import * as Reactivity from "@effect/experimental/Reactivity"
import * as Client from "@effect/sql/SqlClient"
import type { Connection } from "@effect/sql/SqlConnection"
import { SqlError } from "@effect/sql/SqlError"
import type { Custom, Fragment, Primitive } from "@effect/sql/Statement"
import * as Statement from "@effect/sql/Statement"
import { PGlite, types, type Results, type Transaction } from "@electric-sql/pglite"
import * as Otel from "@opentelemetry/semantic-conventions"
import { GlobalValue } from "effect"
import * as Chunk from "effect/Chunk"
import * as Config from "effect/Config"
import type { ConfigError } from "effect/ConfigError"
import * as Context from "effect/Context"
import * as Duration from "effect/Duration"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
/**
* @category extensions
* @since 1.0.0
*/
// Extract the namespace type from an extension definition
export type ExtractNamespace<T> = T extends {
setup: (...args: any[]) => Promise<infer R>
}
? R extends { namespaceObj: infer N }
? N
: {}
: {}
/**
* @category extensions
* @since 1.0.0
*/
// Extract all extension namespaces from an extensions object
export type ExtractExtensionNamespaces<T extends Record<string, any>> = {
[K in keyof T]: ExtractNamespace<T[K]>
}
/**
* @category extensions
* @since 1.0.0
*/
// Create a type with extension namespaces as properties
export type ExtensionsToNamespaces<T extends Record<string, any>> = {
[K in keyof T as K extends string ? K : never]: ExtractNamespace<T[K]>
}
// Helper type to get all values from an object type
type ValueOf<T> = T[keyof T]
// Helper type to convert a union type to an intersection type
type UnionToIntersection<U> = (U extends any ? (k: U) => void : never) extends (k: infer I) => void
? I
: never
/**
* @category type ids
* @since 1.0.0
*/
export const TypeId: unique symbol = Symbol.for("@effect/sql-pglite/PgLiteClient")
/**
* @category type ids
* @since 1.0.0
*/
export type TypeId = typeof TypeId
/**
* @category models
* @since 1.0.0
*/
export interface PgLiteClient<Extensions = {}> extends Client.SqlClient {
readonly [TypeId]: TypeId
readonly config: PgLiteClientConfig<Extensions>
readonly json: (_: unknown) => Fragment
readonly array: (_: ReadonlyArray<Primitive>) => Fragment
readonly pg: PGlite & Extensions
}
/**
* @category tags
* @since 1.0.0
*/
export const PgLiteClient = Context.GenericTag<PgLiteClient<any>>("@effect/sql-pglite/PgLiteClient")
/**
* @category parsers
* @since 1.0.0
*/
export type ParserOptions = {
[pgType: number]: (value: string) => any
}
/**
* @category serializers
* @since 1.0.0
*/
export type SerializerOptions = {
[pgType: number]: (value: any) => string
}
/**
* @category filesystem
* @since 1.0.0
*/
export interface Filesystem {
readonly exists: (path: string) => Promise<boolean>
readonly readFile: (path: string) => Promise<Uint8Array>
readonly writeFile: (path: string, data: Uint8Array) => Promise<void>
readonly unlink: (path: string) => Promise<void>
readonly mkdir: (path: string) => Promise<void>
readonly readdir: (path: string) => Promise<string[]>
readonly rename: (oldPath: string, newPath: string) => Promise<void>
}
/**
* @category query options
* @since 1.0.0
*/
export interface QueryOptions {
readonly rowMode?: "object" | "array" | undefined
readonly parsers?: ParserOptions | undefined
readonly serializers?: SerializerOptions | undefined
readonly blob?: Blob | File | undefined
}
/**
* @category constructors
* @since 1.0.0
*/
export interface PgLiteClientConfig<Extensions = {}> {
/**
* Path to the directory for storing the Postgres database.
* You can provide a URI scheme for various storage backends:
* - `file://` or unprefixed: File system storage (Node and Bun)
* - `idb://`: IndexedDB storage (browser)
* - `memory://`: In-memory ephemeral storage (all platforms)
*/
readonly path?: string | undefined
/**
* The directory in which to store the Postgres database when not provided as path.
* Same URI schemes supported as with path.
*/
readonly dataDir?: string | undefined
/**
* The Postgres debug level (1-5). Logs are sent to the console.
*/
readonly debug?: number | undefined
/**
* Under relaxed durability mode, PGlite will not wait for flushes to storage
* to complete after each query before returning results.
* This is particularly useful when using the IndexedDB file system.
*/
readonly relaxedDurability?: boolean | undefined
/**
* Alternative to providing a dataDir with a filesystem prefix is to
* initialise a Filesystem yourself and provide it here.
*/
readonly fs?: Filesystem | undefined
/**
* A tarball of a PGlite datadir to load when the database starts.
* This should be a tarball produced from the related .dumpDataDir() method.
*/
readonly loadDataDir?: Blob | File | undefined
/**
* An object containing the extensions you wish to load.
* The type system will automatically extract the namespace types from these.
*/
readonly extensions?: Record<string, any> | undefined
/**
* The username of the user to connect to the database as.
* Permissions will be applied in the context of this user.
*/
readonly username?: string | undefined
/**
* The database from the Postgres cluster within the dataDir to connect to.
*/
readonly database?: string | undefined
/**
* The initial amount of memory in bytes to allocate for the PGlite instance.
* PGlite will grow the memory automatically, but if you have a particularly large
* database you can set this higher to prevent the pause during memory growth.
*/
readonly initialMemory?: number | undefined
/**
* A precompiled WASM module to use instead of downloading the default version,
* or when using a bundler that either can, or requires, loading the WASM module
* with a ESM import.
*/
readonly wasmModule?: WebAssembly.Module | undefined
/**
* A filesystem bundle to use instead of downloading the default version.
* This is useful if in a restricted environment such as an edge worker.
*/
readonly fsBundle?: Blob | File | undefined
/**
* An object mapping Postgres data type IDs to parser functions.
*/
readonly parsers?: ParserOptions | undefined
/**
* An object mapping Postgres data type IDs to serializer functions.
*/
readonly serializers?: SerializerOptions | undefined
/**
* Function to transform query result field names (e.g., from snake_case to camelCase)
*/
readonly transformResultNames?: ((str: string) => string) | undefined
/**
* Function to transform query field names (e.g., from camelCase to snake_case)
*/
readonly transformQueryNames?: ((str: string) => string) | undefined
/**
* Whether to transform JSON fields
*/
readonly transformJson?: boolean | undefined
/**
* Whether to fetch types from the database
*/
readonly fetchTypes?: boolean | undefined
/**
* Span attributes for OpenTelemetry
*/
readonly spanAttributes?: Record<string, string> | undefined
/**
* Pre-initialized PGlite instance to use instead of creating a new one
*/
readonly _pgLiteInstance?: (PGlite & Extensions) | undefined
}
/**
* @category constructors
* @since 1.0.0
*/
export const make = <E = {}>(
options: PgLiteClientConfig<E>
): Effect.Effect<PgLiteClient<E>, SqlError, Scope.Scope | Reactivity.Reactivity> => {
return Effect.gen(function* (_) {
const compiler = makeCompiler(options.transformQueryNames, options.transformJson)
const transformRows = options.transformResultNames
? Statement.defaultTransforms(options.transformResultNames, options.transformJson).array
: undefined
// Initialize PGlite with the provided instance or create a new one
let client: PGlite & E = yield* Effect.promise(() =>
GlobalValue.globalValue(Symbol.for("pgLiteInstance"), () => {
// if (options._pgLiteInstance) {
// return options._pgLiteInstance
// } else {
const createConfig: Record<string, any> = {}
// Handle path/dataDir
if (options.dataDir || options.path) {
createConfig.dataDir = options.dataDir || options.path
}
// Copy all other options
if (options.debug !== undefined) createConfig.debug = options.debug
if (options.relaxedDurability !== undefined)
createConfig.relaxedDurability = options.relaxedDurability
if (options.fs !== undefined) createConfig.fs = options.fs
if (options.loadDataDir !== undefined) createConfig.loadDataDir = options.loadDataDir
if (options.extensions !== undefined) createConfig.extensions = options.extensions
if (options.username !== undefined) createConfig.username = options.username
if (options.database !== undefined) createConfig.database = options.database
if (options.initialMemory !== undefined) createConfig.initialMemory = options.initialMemory
if (options.wasmModule !== undefined) createConfig.wasmModule = options.wasmModule
if (options.fsBundle !== undefined) createConfig.fsBundle = options.fsBundle
if (options.parsers !== undefined) createConfig.parsers = options.parsers
if (options.serializers !== undefined) createConfig.serializers = options.serializers
console.warn("[PgLite] Creating PGlite instance with config:", createConfig)
return PGlite.create(createConfig) as Promise<PGlite & E>
// Create a new PGlite instance with proper configuration
// client = yield* Effect.tryPromise({
// try: () => {
// // Use PGlite.create for better type support with extensions
// const createConfig: Record<string, any> = {}
// // Handle path/dataDir
// if (options.dataDir || options.path) {
// createConfig.dataDir = options.dataDir || options.path
// }
// // Copy all other options
// if (options.debug !== undefined) createConfig.debug = options.debug
// if (options.relaxedDurability !== undefined)
// createConfig.relaxedDurability = options.relaxedDurability
// if (options.fs !== undefined) createConfig.fs = options.fs
// if (options.loadDataDir !== undefined) createConfig.loadDataDir = options.loadDataDir
// if (options.extensions !== undefined) createConfig.extensions = options.extensions
// if (options.username !== undefined) createConfig.username = options.username
// if (options.database !== undefined) createConfig.database = options.database
// if (options.initialMemory !== undefined)
// createConfig.initialMemory = options.initialMemory
// if (options.wasmModule !== undefined) createConfig.wasmModule = options.wasmModule
// if (options.fsBundle !== undefined) createConfig.fsBundle = options.fsBundle
// if (options.parsers !== undefined) createConfig.parsers = options.parsers
// if (options.serializers !== undefined) createConfig.serializers = options.serializers
// console.warn("[PgLite] Creating PGlite instance with config:", createConfig)
// return PGlite.create(createConfig) as Promise<PGlite & E>
// },
// catch: (cause) => new SqlError({ cause, message: "PgLiteClient: Failed to initialize" })
// })
// }
})
)
// Wait for PGlite to be ready before proceeding
yield* Effect.tryPromise({
try: () => client.waitReady,
catch: (cause) => new SqlError({ cause, message: "PgLiteClient: Failed to initialize" })
}).pipe(
Effect.timeoutFail({
duration: Duration.seconds(5),
onTimeout: () =>
new SqlError({
cause: new Error("Await ready timed out"),
message: "PgLiteClient: Await ready timed out"
})
})
)
// Acquire the client instance for use with proper cleanup
yield* Effect.acquireRelease(
Effect.tryPromise({
try: () => client.query("select 1", []),
catch: (cause) => new SqlError({ cause, message: "PgLiteClient: Failed to connect" })
}),
() =>
Effect.promise(async () => {
console.log("[PgLite] PGlite Closing instance")
await client.close()
console.log("[PgLite] PGlite instance closed")
})
).pipe(
Effect.timeoutFail({
duration: Duration.seconds(5),
onTimeout: () =>
new SqlError({
cause: new Error("Connection timed out"),
message: "PgLiteClient: Connection timed out"
})
})
)
class ConnectionImpl implements Connection {
constructor(private readonly pg: PGlite) {}
private run(operation: () => Promise<Results>) {
return Effect.async<ReadonlyArray<any>, SqlError>((resume) => {
const promise = operation()
promise.then(
(result) => resume(Effect.succeed(result.rows)),
(cause) =>
resume(Effect.fail(new SqlError({ cause, message: `Failed to execute statement` })))
)
// PGlite doesn't support cancellation, no-op cleanup
return Effect.sync(() => {})
})
}
execute(
sql: string,
params: ReadonlyArray<Primitive>,
transformRows: (<A extends object>(row: ReadonlyArray<A>) => ReadonlyArray<A>) | undefined
) {
return transformRows
? Effect.map(
this.run(() => {
console.info(
`sql-pglite execute SQL: ${sql} with params: ${JSON.stringify(params)}`
)
return this.pg.query(sql, params as any[])
}),
transformRows
)
: this.run(() => {
console.info(`sql-pglite execute SQL: ${sql} with params: ${JSON.stringify(params)}`)
return this.pg.query(sql, params as any[])
})
}
executeRaw(sql: string, params: ReadonlyArray<Primitive>) {
console.info(`sql-pglite executeRaw SQL: ${sql} with params: ${JSON.stringify(params)}`)
return this.run(() => this.pg.query(sql, params as any[]))
}
executeWithoutTransform(sql: string, params: ReadonlyArray<Primitive>) {
console.info(
`sql-pglite executeWithoutTransform SQL: ${sql} with params: ${JSON.stringify(params)}`
)
return this.run(() => this.pg.query(sql, params as any[]))
}
executeValues(sql: string, params: ReadonlyArray<Primitive>) {
console.info(`sql-pglite executeValues SQL: ${sql} with params: ${JSON.stringify(params)}`)
return this.run(() => this.pg.query(sql, params as any[])) as Effect.Effect<
ReadonlyArray<ReadonlyArray<Primitive>>,
SqlError
>
}
executeUnprepared(
sql: string,
params: ReadonlyArray<Primitive>,
transformRows: (<A extends object>(row: ReadonlyArray<A>) => ReadonlyArray<A>) | undefined
) {
return this.execute(sql, params, transformRows)
}
executeStream(
sql: string,
params: ReadonlyArray<Primitive>,
transformRows: (<A extends object>(row: ReadonlyArray<A>) => ReadonlyArray<A>) | undefined
) {
return Stream.fromEffect(
Effect.map(this.execute(sql, params, transformRows), (rows) =>
Chunk.unsafeFromArray(transformRows ? transformRows(rows) : rows)
)
)
}
}
// The attributes that will be added to spans
const spanAttributes: Array<[string, string]> = []
// Add user-provided span attributes
if (options.spanAttributes) {
for (const [key, value] of Object.entries(options.spanAttributes)) {
spanAttributes.push([key, value])
}
}
// Add default span attributes
spanAttributes.push(["db.system", "postgresql"], ["db.name", "pglite"], ["db.type", "pglite"])
return Object.assign(
yield* Client.make({
acquirer: Effect.succeed(new ConnectionImpl(client)),
transactionAcquirer: Effect.map(
Effect.acquireRelease(
Effect.tryPromise({
try: async () => {
console.log("[PgLite] Transaction acquire started")
// Create a promise to get the transaction object
let transactionResolve: (tx: Transaction) => void
let transactionPromise = new Promise<Transaction>((resolve) => {
transactionResolve = resolve
})
// Create a promise to signal when to complete the transaction
let completeResolve: (result: unknown) => void
let completePromise = new Promise<unknown>((resolve) => {
completeResolve = resolve
})
console.log("[PgLite] Starting transaction with client.transaction")
// Start the transaction but don't await its completion
void client.transaction(async (tx) => {
console.log("[PgLite] Inside transaction callback, transaction started")
// Make transaction available outside the callback
transactionResolve(tx)
console.log("[PgLite] Transaction object resolved")
// Keep transaction open until explicitly completed
console.log("[PgLite] Waiting for transaction completion signal")
try {
const result = await completePromise
console.log(
"[PgLite] Transaction completion signal received, returning:",
result
)
return result
} catch (err) {
console.error("[PgLite] Error during transaction wait:", err)
throw err
}
})
console.log("[PgLite] Waiting for transaction object")
// Wait for transaction to be available
const tx = await transactionPromise
console.log("[PgLite] Transaction object received")
// Return both transaction and completion mechanism
return {
tx,
complete: (result: unknown) => {
console.log("[PgLite] Complete called with:", result)
completeResolve(result)
}
}
},
catch: (cause) => {
console.error("[PgLite] Failed to acquire transaction:", cause)
return new SqlError({ cause, message: "Failed to acquire transaction" })
}
}),
// On release: signal completion to commit the transaction
({ complete }) =>
Effect.sync(() => {
console.log("[PgLite] Transaction release triggered")
complete(undefined) // Resolve the promise to complete the transaction
console.log("[PgLite] Transaction release completed")
})
),
// Map the transaction wrapper to a proper ConnectionImpl
({ tx }) =>
new ConnectionImpl({
query: tx.query.bind(tx),
exec: tx.exec.bind(tx),
sql: tx.sql?.bind(tx)
// Other methods from Connection as needed
} as any)
),
compiler,
spanAttributes: [
...(options.spanAttributes ? Object.entries(options.spanAttributes) : []),
[Otel.SEMATTRS_DB_SYSTEM, Otel.DBSYSTEMVALUES_POSTGRESQL],
[Otel.SEMATTRS_DB_NAME, "postgres"]
],
transformRows
}),
{
[TypeId]: TypeId as TypeId,
config: {
...options
},
json: (_: unknown) => PgLiteJson(_),
array: (_: ReadonlyArray<Primitive>) => PgLiteArray(_),
listen: (channel: string) =>
Stream.asyncPush<string, SqlError>((emit) =>
Effect.acquireRelease(
Effect.tryPromise({
try: () => client.listen(channel, (payload) => emit.single(payload)),
catch: (cause) => new SqlError({ cause, message: "Failed to listen" })
}),
(unlisten) => Effect.promise(() => unlisten())
)
),
notify: (channel: string, payload: string) =>
Effect.tryPromise({
try: () => client.exec(`NOTIFY ${channel}, ${payload}`),
catch: (cause) => new SqlError({ cause, message: "Failed to notify" })
}),
pg: client // Expose the PGlite instance with extensions
}
)
})
}
/**
* @category layers
* @since 1.0.0
*/
export const layerConfig = <E = {}>(
config: Config.Config.Wrap<PgLiteClientConfig<E>>
): Layer.Layer<PgLiteClient<E> | Client.SqlClient, ConfigError | SqlError> =>
Layer.scopedContext(
Config.unwrap(config).pipe(
Effect.flatMap(make<E>),
Effect.map((client) =>
Context.make(PgLiteClient, client).pipe(Context.add(Client.SqlClient, client))
)
)
).pipe(Layer.provide(Reactivity.layer))
/**
* @category layers
* @since 1.0.0
*/
export const layer = <E = {}>(
config: PgLiteClientConfig<E>
): Layer.Layer<PgLiteClient<E> | Client.SqlClient, ConfigError | SqlError> =>
Layer.scopedContext(
Effect.map(make<E>(config), (client) =>
Context.make(PgLiteClient, client).pipe(Context.add(Client.SqlClient, client))
)
).pipe(Layer.provide(Reactivity.layer))
/**
* Helper type for creating extension-specific layers
* @category extensions
* @since 1.0.0
*/
export type ExtensionConfig<E> = Omit<PgLiteClientConfig<E>, "extensions"> & {
readonly extensions?: Record<string, any>
}
/**
* Helper function to create a layer with specific extensions and a named tag
* This creates a layer that provides a PgLiteClient with properly typed extensions,
* along with a tag that can be used to access it.
*
* Example:
* ```ts
* const db = PgLiteClient.layerWithExtensionsTagged(
* "AppDatabase",
* { dataDir: "idb://my-app-db" },
* {
* live: liveExtension,
* electric: electricExtension
* }
* )
*
* // Use the client with typed extensions
* const program = Effect.gen(function* (_) {
* const client = yield* _(db.tag)
* const result = yield* _(client.pg.live.query("SELECT * FROM users"))
* yield* _(client.pg.electric.syncShapesToTables())
* return result
* })
*
* // Provide the layer
* Effect.provide(program, db.layer)
* ```
*
* @param tag - A unique name for this database service
* @param config - The configuration for the PgLiteClient
* @param extensions - The extensions to include in the client
* @category extensions
* @since 1.0.0
*/
export const layerWithExtensionsTagged = <T extends Record<string, any>, Tag extends string>(
tag: Tag,
config: Omit<PgLiteClientConfig<ExtensionsToNamespaces<T>>, "extensions">,
extensions: T
): {
layer: Layer.Layer<
PgLiteClient<ExtensionsToNamespaces<T>> | Client.SqlClient,
ConfigError | SqlError
>
tag: Context.Tag<Tag, PgLiteClient<ExtensionsToNamespaces<T>>>
} => {
return {
tag: Context.Tag(tag)<Tag, PgLiteClient<ExtensionsToNamespaces<T>>>(),
layer: layer<ExtensionsToNamespaces<T>>({
...config,
extensions: {
...extensions
}
} as PgLiteClientConfig<ExtensionsToNamespaces<T>>)
}
}
/**
* @category constructor
* @since 1.0.0
*/
export const makeCompiler = (
transform?: (_: string) => string,
transformJson = true
): Statement.Compiler => {
const transformValue =
transformJson && transform ? Statement.defaultTransforms(transform).value : undefined
return Statement.makeCompiler<PgLiteCustom>({
dialect: "pg", // PGlite uses PostgreSQL dialect
placeholder(_) {
return `$${_}`
},
onIdentifier: transform
? function (value, withoutTransform) {
return withoutTransform ? escape(value) : escape(transform(value))
}
: escape,
onRecordUpdate(placeholders, valueAlias, valueColumns, values, returning) {
return [
`(values ${placeholders}) AS ${valueAlias}${valueColumns}${returning ? ` RETURNING ${returning[0]}` : ""}`,
returning ? values.flat().concat(returning[1]) : values.flat()
]
},
onCustom(type, placeholder, withoutTransform) {
switch (type.kind) {
case "PgLiteJson": {
return [
placeholder(undefined),
[
types.types.json.serialize(
withoutTransform || transformValue === undefined ? type.i0 : transformValue(type.i0)
) as any
]
]
}
case "PgLiteArray": {
const param = type.i0 // types.arraySerializer(type.i0 as any, undefined, types.TEXT) as any
// const first = type.i0[0]
// switch (typeof first) {
// case "boolean": {
// param.type = types.BOOL
// break
// }
// case "number": {
// param.type = types.NUMERIC
// break
// }
// default: {
// param.type = types.TEXT
// break
// }
// }
return [placeholder(undefined), [param]]
}
}
}
})
}
const escape = Statement.defaultEscape('"')
/**
* @category custom types
* @since 1.0.0
*/
export type PgLiteCustom = PgLiteJson | PgLiteArray
/**
* @category custom types
* @since 1.0.0
*/
interface PgLiteJson extends Custom<"PgLiteJson", unknown> {}
/**
* @category custom types
* @since 1.0.0
*/
const PgLiteJson = Statement.custom<PgLiteJson>("PgLiteJson")
/**
* @category custom types
* @since 1.0.0
*/
interface PgLiteArray extends Custom<"PgLiteArray", ReadonlyArray<Primitive>> {}
/**
* @category custom types
* @since 1.0.0
*/
const PgLiteArray = Statement.custom<PgLiteArray>("PgLiteArray")
/**
* @category extensions
* @since 1.0.0
*/
export const createPgLiteTag = <T extends Record<string, any>, Tag extends string>(
id: Tag
): Context.Tag<Tag, PgLiteClient<ExtensionsToNamespaces<T>>> => {
// Use the Context.GenericTag constructor
return Context.Tag(id)<Tag, PgLiteClient<ExtensionsToNamespaces<T>>>()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment