Created
June 7, 2023 22:18
-
-
Save zgotsch/e10506a0a6b4c9fcfe043f729b4ed500 to your computer and use it in GitHub Desktop.
A small versioned graph structure on top of Postgres
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {sql} from "@vercel/postgres"; | |
import assertNever from "./assertNever"; | |
import invariant from "small-invariant"; | |
type Identified = {id: unknown}; | |
type IdentifierOf<T extends {id: unknown}> = T["id"]; | |
type Node<T extends Identified> = { | |
id: IdentifierOf<T>; | |
version: number; | |
is_latest: boolean; | |
data: T; | |
}; | |
type Edge<T extends Identified> = { | |
source_id: unknown; | |
target_id: unknown; | |
id: IdentifierOf<T>; | |
data: T; | |
version: number; | |
is_latest: boolean; | |
}; | |
export async function createNode<T extends Identified>(data: T): Promise<IdentifierOf<T>> { | |
const result = await sql`INSERT INTO nodes (data) VALUES (${JSON.stringify(data)})`; | |
return data.id; | |
} | |
export async function getNode<T extends Identified>(id: IdentifierOf<T>): Promise<null | Node<T>> { | |
const result = await sql`SELECT * FROM nodes WHERE id = ${JSON.stringify( | |
id | |
)} AND is_latest = TRUE`; | |
const [firstRow] = result.rows; | |
if (!firstRow) { | |
return null; | |
} | |
return firstRow as Node<T>; | |
} | |
export async function updateNode<T extends Identified, U extends Identified>( | |
id: IdentifierOf<T>, | |
updater: (oldValue: null | T) => U | |
): Promise<void> { | |
const client = await sql.connect(); | |
try { | |
await client.sql`BEGIN`; | |
let nextVersion; | |
const oldValue = await getNode(id); | |
if (!oldValue) { | |
await createNode(updater(null)); | |
} else { | |
nextVersion = oldValue.version + 1; | |
await client.sql`UPDATE nodes SET is_latest = FALSE WHERE id = ${JSON.stringify(id)}`; | |
await client.sql`INSERT INTO nodes (id, version, is_latest, data) VALUES (${JSON.stringify( | |
id | |
)}, ${nextVersion}, TRUE, ${JSON.stringify(updater(oldValue.data))})`; | |
} | |
await client.sql`COMMIT`; | |
} catch (e) { | |
await client.sql`ROLLBACK`; | |
throw e; | |
} finally { | |
await client.release(); | |
} | |
} | |
export async function deleteNode<T extends Identified>(id: IdentifierOf<T>): Promise<void> { | |
await sql`UPDATE nodes SET is_latest = FALSE WHERE id = ${JSON.stringify(id)}`; | |
} | |
export async function createEdge<T>(sourceId: unknown, targetId: unknown, data: T): Promise<void> { | |
await sql`INSERT INTO edges (source_id, target_id, data) VALUES (${JSON.stringify( | |
sourceId | |
)}, ${JSON.stringify(targetId)}, ${JSON.stringify(data)})`; | |
} | |
export async function getOutgoingEdges<T extends Identified>( | |
sourceId: unknown | |
): Promise<Array<Edge<T>>> { | |
const result = await sql`SELECT * FROM edges WHERE source_id = ${JSON.stringify(sourceId)}`; | |
return result.rows as Array<Edge<T>>; | |
} | |
export async function getIncomingEdges<T extends Identified>( | |
targetId: unknown | |
): Promise<Array<Edge<T>>> { | |
const result = await sql`SELECT * FROM edges WHERE target_id = ${JSON.stringify(targetId)}`; | |
return result.rows as Array<Edge<T>>; | |
} | |
export async function getEdges<T extends Identified>( | |
nodeId: unknown | |
): Promise<{incoming: Array<Edge<T>>; outgoing: Array<Edge<T>>}> { | |
const client = await sql.connect(); | |
try { | |
await client.sql`BEGIN`; | |
const outgoingResult = await client.sql`SELECT * FROM edges WHERE source_id = ${JSON.stringify( | |
nodeId | |
)} AND is_latest = TRUE`; | |
const incomingResult = await client.sql`SELECT * FROM edges WHERE target_id = ${JSON.stringify( | |
nodeId | |
)} AND is_latest = TRUE`; | |
await client.sql`COMMIT`; | |
return { | |
incoming: incomingResult.rows as Array<Edge<T>>, | |
outgoing: outgoingResult.rows as Array<Edge<T>>, | |
}; | |
} catch (e) { | |
await client.sql`ROLLBACK`; | |
throw e; | |
} finally { | |
await client.release(); | |
} | |
} | |
type UpdateResult<T> = | |
| { | |
operation: "update"; | |
value: T; | |
} | |
| { | |
operation: "delete"; | |
} | |
| { | |
operation: "ignore"; | |
}; | |
export async function updateEdges<T extends Identified, U extends Identified>( | |
sourceId: unknown, | |
targetId: unknown, | |
updater: (oldEdge: T) => UpdateResult<U> | |
): Promise<void> { | |
const client = await sql.connect(); | |
try { | |
await client.sql`BEGIN`; | |
const oldEdges = await client.sql`SELECT * FROM edges WHERE source_id = ${JSON.stringify( | |
sourceId | |
)} AND target_id = ${JSON.stringify(targetId)} AND is_latest = TRUE`; | |
const edgesToDelete: Array<Edge<T>> = []; | |
const edgesToUpdate: Array<{oldEdge: Edge<T>; newData: U}> = []; | |
for (const _oldEdge of oldEdges.rows) { | |
const oldEdge = _oldEdge as Edge<T>; | |
const result = updater(oldEdge.data); | |
if (result.operation === "ignore") { | |
// do nothing | |
} else if (result.operation === "delete") { | |
edgesToDelete.push(oldEdge); | |
} else if (result.operation === "update") { | |
const newData = result.value; | |
edgesToUpdate.push({oldEdge, newData}); | |
} else { | |
assertNever(result); | |
} | |
} | |
for (const edge of edgesToDelete) { | |
await client.sql`UPDATE edges SET is_latest = FALSE WHERE source_id = ${JSON.stringify( | |
edge.source_id | |
)} AND target_id = ${JSON.stringify(edge.target_id)} AND version = ${ | |
edge.version | |
} AND id = ${JSON.stringify(edge.id)}`; | |
} | |
for (const {oldEdge, newData} of edgesToUpdate) { | |
invariant( | |
oldEdge.id === newData.id, | |
"The id of the new data must match the id of the old edge" | |
); | |
const newEdge: Edge<typeof newData> = { | |
...oldEdge, | |
version: oldEdge.version + 1, | |
data: newData, | |
}; | |
await client.sql`UPDATE edges SET is_latest = FALSE WHERE source_id = ${JSON.stringify( | |
oldEdge.source_id | |
)} AND target_id = ${JSON.stringify(oldEdge.target_id)} AND version = ${ | |
oldEdge.version | |
} AND id = ${JSON.stringify(oldEdge.id)}`; | |
await client.sql`INSERT INTO edges (id, source_id, target_id, version, is_latest, data) VALUES (${JSON.stringify( | |
newEdge.id | |
)}, ${JSON.stringify(newEdge.source_id)}, ${JSON.stringify(newEdge.target_id)}, ${ | |
newEdge.version | |
}, TRUE, ${JSON.stringify(newEdge.data)})`; | |
} | |
await client.sql`COMMIT`; | |
} catch (e) { | |
await client.sql`ROLLBACK`; | |
throw e; | |
} finally { | |
await client.release(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE IF NOT EXISTS | |
nodes ( | |
data JSONB, | |
id TEXT GENERATED ALWAYS AS (data -> 'id') STORED NOT NULL, | |
version INTEGER NOT NULL DEFAULT 0, | |
is_latest BOOLEAN NOT NULL DEFAULT TRUE, | |
UNIQUE (id, version), | |
UNIQUE (id, is_latest) | |
); | |
CREATE INDEX IF NOT EXISTS latest_version_idx ON nodes (id, is_latest DESC); | |
CREATE TABLE IF NOT EXISTS | |
edges ( | |
source_id TEXT, | |
target_id TEXT, | |
data JSONB, | |
id TEXT GENERATED ALWAYS AS (data -> 'id') STORED NOT NULL, | |
version INTEGER NOT NULL DEFAULT 0, | |
is_latest BOOLEAN NOT NULL DEFAULT TRUE, | |
UNIQUE (source_id, target_id, id, version), | |
FOREIGN KEY (source_id) REFERENCES nodes (id), | |
FOREIGN KEY (target_id) REFERENCES nodes (id) | |
); | |
CREATE INDEX IF NOT EXISTS source_id_idx ON edges (source_id); | |
CREATE INDEX IF NOT EXISTS target_id_idx ON edges (target_id); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment