Skip to content

Instantly share code, notes, and snippets.

@Yarith
Created June 3, 2025 14:59
Show Gist options
  • Save Yarith/486ed7661e3cc8d8bf47119ff7b1d66a to your computer and use it in GitHub Desktop.
Save Yarith/486ed7661e3cc8d8bf47119ff7b1d66a to your computer and use it in GitHub Desktop.
Connection between PGlite and postgres.js without TCP Server
import { PGlite } from "@electric-sql/pglite";
import postgres from "postgres";
import { createPGliteSocket } from "./pglite-socket.mjs";
const db = new PGlite();
/** @type {postgres.Options<{}> & {socket?: (options: unknown) => unknown}} */
const options = {
max: 1,
socket: () => createPGliteSocket(db),
};
const sql = postgres(options);
try {
const description = await sql`select 1`.describe();
// {"string":"select 1","types":[],"name":"kawf0mwrpjn2",
// "columns":[{"name":"?column?","table":0,"number":0,"type":23}]}
console.log(JSON.stringify(description));
} finally {
sql.end();
}
import EventEmitter from 'node:events';
/**
* @typedef {import('node:net').Socket} Socket
* @typedef {import('@electric-sql/pglite').PGlite} PGlite
* @typedef {Socket['pause']} SocketPause
* @typedef {Socket['resume']} SocketResume
* @typedef {Socket['write']} SocketWrite
* @typedef {Socket['end']} SocketEnd
* @typedef {Socket['destroy']} SocketDestroy
*/
/**
* Creates a socket instance for usage in postgres.js by directly wrapping the PGlite database.
*
* Example usage:
* ```js
* import { PGlite } from "@electric-sql/pglite";
* import postgres from "postgres";
* import { createPGliteSocket } from "./pglite-socket.mjs";
*
* const db = new PGlite();
* const options = {
* max: 1,
* socket: () => createPGliteSocket(db),
* };
* const sql = postgres(options);
* try {
* const description = await sql`select 1`.describe();
* // {"string":"select 1","types":[],"name":"kawf0mwrpjn2",
* // "columns":[{"name":"?column?","table":0,"number":0,"type":23}]}
* console.log(JSON.stringify(description));
* } finally {
* sql.end();
* }
* ```
* @param {PGlite} db
* @returns {Socket} An object that implements everything of the Socket interface that postgres.js is needing to communicate with the PGlite database.
*/
export function createPGliteSocket(db) {
/** @type {Socket['readyState']} */
let readyState = "open";
/** @type {Uint8Array[]} */
const readBuffer = [];
/** @typedef {{promise: Promise<void>, resolve: () => void}} CurrentPause */
/** @type {CurrentPause | null} */
let currentPause = null;
/** @type {Promise<Uint8Array> | null} */
let currentExec = null;
const eventEmitter = new EventEmitter();
/**
*
* @param {Uint8Array} data
*/
async function execute(/** @type {Uint8Array} */ data) {
const exec = db.execProtocolRaw(data);
currentExec = exec;
const result = await exec;
if (exec === currentExec) {
currentExec = null;
}
return result;
}
function emitPendingData(
/** @type {Uint8Array | undefined} */ result = undefined
) {
if (currentPause || readBuffer.length > 0) {
if (result) {
readBuffer.push(result);
}
while (!currentPause) {
const item = readBuffer.shift();
if (!item)
break;
eventEmitter.emit('data', Buffer.from(item));
}
} else if (result) {
eventEmitter.emit('data', Buffer.from(result));
}
}
async function emitAllPendingData() {
while (readBuffer.length > 0) {
while (currentPause) {
await currentPause.promise;
}
emitPendingData();
}
}
async function process(
/** @type {Uint8Array} */ data,
/** @type {((err?: Error | null) => void) | undefined} */ cb) {
/** @type {Uint8Array<ArrayBufferLike>} */
let result;
try {
if (cb) {
eventEmitter.once('drain', cb);
}
result = await execute(data);
eventEmitter.emit('drain');
} catch (err) {
eventEmitter.emit('drain', err instanceof Error ? err : new Error(`${err}`));
return;
}
emitPendingData(result);
}
/** @typedef {Uint8Array | string} SocketWriteData */
/** @typedef {(err?: Error | null) => void} SocketWriteCb */
/** @typedef {BufferEncoding} SocketWriteEncoding */
/** @type {SocketWrite} */
const write = function (
/** @type {SocketWriteData} */ data,
/** @type {SocketWriteEncoding | SocketWriteCb | undefined} */ encoding = 'utf8',
/** @type {SocketWriteCb | undefined} */ cb = undefined) {
if (typeof encoding === "function") {
cb = encoding;
}
if (typeof data === "string") {
data = Buffer.from(data, typeof encoding === "string" ? encoding : "utf8");
}
process(data, cb);
return false;
};
/** @type {SocketPause} */
const pause = function() {
if (!currentPause) {
/** @type {() => void} */
let resolve = () => { };
const promise = /** @type {Promise<void>} */(
new Promise((res) => resolve = res));
currentPause = { promise, resolve };
}
return getInstance();
}
/** @type {SocketResume} */
const resume = function() {
if (currentPause) {
const resolve = currentPause.resolve;
currentPause = null;
resolve();
}
return getInstance();
}
function close() {
if (readyState === "open") {
readyState = "closed";
eventEmitter.emit('close');
}
}
/** @typedef {Uint8Array | string} SocketEndData */
/** @typedef {BufferEncoding} SocketEndEncoding */
/** @typedef {() => void} SocketEndCallback */
/** @type {SocketEnd} */
const end = function (
/** @type {SocketEndData | SocketEndCallback | undefined} */ data,
/** @type {SocketEndEncoding | SocketEndCallback | undefined} */ encoding = undefined,
/** @type {SocketEndCallback | undefined} */ callback = undefined) {
function onAllPendingDataEmitted() {
if (callback) {
callback();
}
close();
}
function waitForAllPendingDataEmitted() {
emitAllPendingData().then(onAllPendingDataEmitted);
}
if (typeof data === "string") {
if (typeof encoding === "string") {
write(data, encoding, waitForAllPendingDataEmitted);
} else if (typeof encoding === "function") {
callback = encoding;
write(data, waitForAllPendingDataEmitted);
}
} else if (typeof data === "function") {
callback = data;
waitForAllPendingDataEmitted();
} else if (data) {
if (typeof encoding === "function") {
callback = encoding;
}
write(data, waitForAllPendingDataEmitted);
}
return getInstance();
}
/** @type {SocketDestroy} */
const destroy = function() {
close();
return getInstance();
}
function getInstance() {
return /** @type {Socket} */(result);
}
const result = {
on: eventEmitter.on.bind(eventEmitter),
once: eventEmitter.once.bind(eventEmitter),
removeListener: eventEmitter.removeListener.bind(eventEmitter),
removeAllListeners: eventEmitter.removeAllListeners.bind(eventEmitter),
write,
pause,
resume,
end,
destroy,
};
Object.defineProperty(result, "readyState", {
get: () => readyState,
});
return getInstance();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment