Created
June 3, 2025 14:59
-
-
Save Yarith/486ed7661e3cc8d8bf47119ff7b1d66a to your computer and use it in GitHub Desktop.
Connection between PGlite and postgres.js without TCP Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { PGlite } from "@electric-sql/pglite"; | |
import postgres from "postgres"; | |
import { createPGliteSocket } from "./pglite-socket.mjs"; | |
const db = new PGlite(); | |
/** @type {postgres.Options<{}> & {socket?: (options: unknown) => unknown}} */ | |
const options = { | |
max: 1, | |
socket: () => createPGliteSocket(db), | |
}; | |
const sql = postgres(options); | |
try { | |
const description = await sql`select 1`.describe(); | |
// {"string":"select 1","types":[],"name":"kawf0mwrpjn2", | |
// "columns":[{"name":"?column?","table":0,"number":0,"type":23}]} | |
console.log(JSON.stringify(description)); | |
} finally { | |
sql.end(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import EventEmitter from 'node:events'; | |
/** | |
* @typedef {import('node:net').Socket} Socket | |
* @typedef {import('@electric-sql/pglite').PGlite} PGlite | |
* @typedef {Socket['pause']} SocketPause | |
* @typedef {Socket['resume']} SocketResume | |
* @typedef {Socket['write']} SocketWrite | |
* @typedef {Socket['end']} SocketEnd | |
* @typedef {Socket['destroy']} SocketDestroy | |
*/ | |
/** | |
* Creates a socket instance for usage in postgres.js by directly wrapping the PGlite database. | |
* | |
* Example usage: | |
* ```js | |
* import { PGlite } from "@electric-sql/pglite"; | |
* import postgres from "postgres"; | |
* import { createPGliteSocket } from "./pglite-socket.mjs"; | |
* | |
* const db = new PGlite(); | |
* const options = { | |
* max: 1, | |
* socket: () => createPGliteSocket(db), | |
* }; | |
* const sql = postgres(options); | |
* try { | |
* const description = await sql`select 1`.describe(); | |
* // {"string":"select 1","types":[],"name":"kawf0mwrpjn2", | |
* // "columns":[{"name":"?column?","table":0,"number":0,"type":23}]} | |
* console.log(JSON.stringify(description)); | |
* } finally { | |
* sql.end(); | |
* } | |
* ``` | |
* @param {PGlite} db | |
* @returns {Socket} An object that implements everything of the Socket interface that postgres.js is needing to communicate with the PGlite database. | |
*/ | |
export function createPGliteSocket(db) { | |
/** @type {Socket['readyState']} */ | |
let readyState = "open"; | |
/** @type {Uint8Array[]} */ | |
const readBuffer = []; | |
/** @typedef {{promise: Promise<void>, resolve: () => void}} CurrentPause */ | |
/** @type {CurrentPause | null} */ | |
let currentPause = null; | |
/** @type {Promise<Uint8Array> | null} */ | |
let currentExec = null; | |
const eventEmitter = new EventEmitter(); | |
/** | |
* | |
* @param {Uint8Array} data | |
*/ | |
async function execute(/** @type {Uint8Array} */ data) { | |
const exec = db.execProtocolRaw(data); | |
currentExec = exec; | |
const result = await exec; | |
if (exec === currentExec) { | |
currentExec = null; | |
} | |
return result; | |
} | |
function emitPendingData( | |
/** @type {Uint8Array | undefined} */ result = undefined | |
) { | |
if (currentPause || readBuffer.length > 0) { | |
if (result) { | |
readBuffer.push(result); | |
} | |
while (!currentPause) { | |
const item = readBuffer.shift(); | |
if (!item) | |
break; | |
eventEmitter.emit('data', Buffer.from(item)); | |
} | |
} else if (result) { | |
eventEmitter.emit('data', Buffer.from(result)); | |
} | |
} | |
async function emitAllPendingData() { | |
while (readBuffer.length > 0) { | |
while (currentPause) { | |
await currentPause.promise; | |
} | |
emitPendingData(); | |
} | |
} | |
async function process( | |
/** @type {Uint8Array} */ data, | |
/** @type {((err?: Error | null) => void) | undefined} */ cb) { | |
/** @type {Uint8Array<ArrayBufferLike>} */ | |
let result; | |
try { | |
if (cb) { | |
eventEmitter.once('drain', cb); | |
} | |
result = await execute(data); | |
eventEmitter.emit('drain'); | |
} catch (err) { | |
eventEmitter.emit('drain', err instanceof Error ? err : new Error(`${err}`)); | |
return; | |
} | |
emitPendingData(result); | |
} | |
/** @typedef {Uint8Array | string} SocketWriteData */ | |
/** @typedef {(err?: Error | null) => void} SocketWriteCb */ | |
/** @typedef {BufferEncoding} SocketWriteEncoding */ | |
/** @type {SocketWrite} */ | |
const write = function ( | |
/** @type {SocketWriteData} */ data, | |
/** @type {SocketWriteEncoding | SocketWriteCb | undefined} */ encoding = 'utf8', | |
/** @type {SocketWriteCb | undefined} */ cb = undefined) { | |
if (typeof encoding === "function") { | |
cb = encoding; | |
} | |
if (typeof data === "string") { | |
data = Buffer.from(data, typeof encoding === "string" ? encoding : "utf8"); | |
} | |
process(data, cb); | |
return false; | |
}; | |
/** @type {SocketPause} */ | |
const pause = function() { | |
if (!currentPause) { | |
/** @type {() => void} */ | |
let resolve = () => { }; | |
const promise = /** @type {Promise<void>} */( | |
new Promise((res) => resolve = res)); | |
currentPause = { promise, resolve }; | |
} | |
return getInstance(); | |
} | |
/** @type {SocketResume} */ | |
const resume = function() { | |
if (currentPause) { | |
const resolve = currentPause.resolve; | |
currentPause = null; | |
resolve(); | |
} | |
return getInstance(); | |
} | |
function close() { | |
if (readyState === "open") { | |
readyState = "closed"; | |
eventEmitter.emit('close'); | |
} | |
} | |
/** @typedef {Uint8Array | string} SocketEndData */ | |
/** @typedef {BufferEncoding} SocketEndEncoding */ | |
/** @typedef {() => void} SocketEndCallback */ | |
/** @type {SocketEnd} */ | |
const end = function ( | |
/** @type {SocketEndData | SocketEndCallback | undefined} */ data, | |
/** @type {SocketEndEncoding | SocketEndCallback | undefined} */ encoding = undefined, | |
/** @type {SocketEndCallback | undefined} */ callback = undefined) { | |
function onAllPendingDataEmitted() { | |
if (callback) { | |
callback(); | |
} | |
close(); | |
} | |
function waitForAllPendingDataEmitted() { | |
emitAllPendingData().then(onAllPendingDataEmitted); | |
} | |
if (typeof data === "string") { | |
if (typeof encoding === "string") { | |
write(data, encoding, waitForAllPendingDataEmitted); | |
} else if (typeof encoding === "function") { | |
callback = encoding; | |
write(data, waitForAllPendingDataEmitted); | |
} | |
} else if (typeof data === "function") { | |
callback = data; | |
waitForAllPendingDataEmitted(); | |
} else if (data) { | |
if (typeof encoding === "function") { | |
callback = encoding; | |
} | |
write(data, waitForAllPendingDataEmitted); | |
} | |
return getInstance(); | |
} | |
/** @type {SocketDestroy} */ | |
const destroy = function() { | |
close(); | |
return getInstance(); | |
} | |
function getInstance() { | |
return /** @type {Socket} */(result); | |
} | |
const result = { | |
on: eventEmitter.on.bind(eventEmitter), | |
once: eventEmitter.once.bind(eventEmitter), | |
removeListener: eventEmitter.removeListener.bind(eventEmitter), | |
removeAllListeners: eventEmitter.removeAllListeners.bind(eventEmitter), | |
write, | |
pause, | |
resume, | |
end, | |
destroy, | |
}; | |
Object.defineProperty(result, "readyState", { | |
get: () => readyState, | |
}); | |
return getInstance(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment