Last active
November 14, 2024 00:41
-
-
Save cmdruid/43159fd102849147132875fee15db7ec to your computer and use it in GitHub Desktop.
Shameless copy of https://github.com/coracle-social/bucket, updated to ESM with types, validation and minimal dependencies. (<300 LOC)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { z } from 'zod' | |
import { schnorr } from '@noble/curves/secp256k1' | |
import { sha256 } from '@noble/hashes/sha256' | |
import { WebSocket, WebSocketServer } from 'ws' | |
/* ================ [ Configuration ] ================ */ | |
const SERVER_PORT = 8002 // Port to use for the server. | |
const PURGE_IVAL = 600 // Interval to purge events (in seconds). | |
const DEBUG = process.env['DEBUG'] === 'true' | |
const VERBOSE = process.env['VERBOSE'] === 'true' || DEBUG | |
/* ================ [ Interfaces ] ================ */ | |
interface EventFilter { | |
ids ?: string[] | |
authors ?: string[] | |
kinds ?: number[] | |
since ?: number | |
until ?: number | |
limit ?: number | |
[ key : string ] : any | undefined | |
} | |
interface SignedEvent { | |
content : string | |
created_at : number | |
id : string | |
kind : number | |
pubkey : string | |
sig : string | |
tags : string[][] | |
} | |
interface Subscription { | |
instance : Instance, | |
filters : EventFilter[] | |
} | |
/* ================ [ Schema ] ================ */ | |
const num = z.number().max(Number.MAX_SAFE_INTEGER), | |
str = z.string(), | |
stamp = num.min(500_000_000), | |
hex = str.regex(/^[0-9a-fA-F]*$/).refine(e => e.length % 2 === 0), | |
hash = hex.refine((e) => e.length === 64), | |
sig = hex.refine((e) => e.length === 128), | |
tags = str.array() | |
const event_schema = z.object({ | |
content : str, | |
created_at : stamp, | |
id : hash, | |
kind : num, | |
pubkey : hash, | |
sig : sig, | |
tags : tags.array() | |
}) | |
const filter_schema = z.object({ | |
ids : hash.array().optional(), | |
authors : hash.array().optional(), | |
kinds : num.array().optional(), | |
since : stamp.optional(), | |
until : stamp.optional(), | |
limit : num.optional(), | |
}).catchall(tags) | |
const sub_schema = z.tuple([ str ]).rest(filter_schema) | |
/* ================ [ Server Init ] ================ */ | |
// Stores the subscriptions for each connection. | |
const SUBMAP : Map<string, Subscription> = new Map() | |
// Stores the events for the relay in memory. | |
let EVENT_CACHE : SignedEvent[] = [] | |
// Stores the connection count. | |
let conn_count = 0 | |
// Initialize the websocket server. | |
const wss = new WebSocketServer({ port: SERVER_PORT }) | |
// Print out the port in use. | |
console.log('[ relay ] running on port:', SERVER_PORT) | |
// If set, start the timer for purging events. | |
if (PURGE_IVAL) { | |
console.log(`[ relay ] purging events every ${PURGE_IVAL} seconds`) | |
setInterval(() => { | |
EVENT_CACHE = [] | |
}, PURGE_IVAL * 1000) | |
} | |
/* ================ [ Connection Handler ] ================ */ | |
wss.on('connection', socket => { | |
const relay = new Instance(socket) | |
socket.on('message', msg => relay._handler(msg.toString())) | |
socket.on('error', e => relay._onerr(e)) | |
socket.on('close', code => relay._cleanup(code)) | |
conn_count += 1 | |
}) | |
/* ================ [ Instance Class ] ================ */ | |
class Instance { | |
private readonly _pid : string | |
private readonly _socket : WebSocket | |
private readonly _subs : Set<string> | |
constructor(socket : WebSocket) { | |
this._pid = Math.random().toString().slice(2, 8) | |
this._socket = socket | |
this._subs = new Set() | |
this.log.relay('client connected') | |
} | |
_cleanup (code : number) { | |
this._socket.close() | |
for (const subId of this._subs) { | |
this.remSub(subId) | |
} | |
conn_count -= 1 | |
this.log.relay(`[ ${this._pid} ]`, 'client disconnected with code:', code) | |
} | |
_handler (message : string) { | |
let verb : string, payload : any | |
try { | |
[ verb, ...payload ] = JSON.parse(message) | |
assert(typeof verb === 'string') | |
switch (verb) { | |
case 'REQ': | |
const [ id, ...filters ] = sub_schema.parse(payload) | |
return this._onreq(id, filters) | |
case 'EVENT': | |
const event = event_schema.parse(payload.at(0)) | |
return this._onevent(event) | |
case 'CLOSE': | |
const subid = str.parse(payload.at(0)) | |
return this._onclose(subid) | |
default: | |
this.log.info('unable to handle message type:', verb) | |
this.send(['NOTICE', '', 'Unable to handle message']) | |
} | |
} catch (e) { | |
this.log.debug('failed to parse message:\n\n', message) | |
return this.send(['NOTICE', '', 'Unable to parse message']) | |
} | |
} | |
_onclose (sub_id : string) { | |
this.log.info('closed subscription:', sub_id) | |
this.remSub(sub_id) | |
} | |
_onerr (err : Error) { | |
this.log.info('socket encountered an error:\n\n', err) | |
} | |
_onevent (event : SignedEvent) { | |
this.log.relay('received event id:', event.id) | |
this.log.debug('event:', event) | |
if (!verify_event(event)) { | |
this.log.debug('event failed validation:', event) | |
this.send([ 'OK', event.id, false, 'event failed validation' ]) | |
return | |
} | |
this.send([ 'OK', event.id, true, '' ]) | |
EVENT_CACHE = EVENT_CACHE.concat(event).sort((a, b) => a > b ? -1 : 1) | |
this.send(['OK', event.id, true, ""]) | |
for (const [ sub_id, { instance, filters } ] of SUBMAP.entries()) { | |
for (const filter of filters) { | |
if (match_filter(event, filter)) { | |
this.log.debug('event matched subscription:', sub_id, event.id) | |
instance.send(['EVENT', sub_id, event]) | |
} | |
} | |
} | |
} | |
_onreq ( | |
sub_id : string, | |
filters : EventFilter[] | |
) : void { | |
this.log.relay('received subscription request:', sub_id) | |
this.log.debug('filters:', filters) | |
// Add the subscription to our set. | |
this.addSub(sub_id, filters) | |
// For each filter: | |
for (const filter of filters) { | |
// Set the limit count, if any. | |
let limit_count = filter.limit | |
// For each event in the cache: | |
for (const event of EVENT_CACHE) { | |
// If there is no limit, or we are above the limit: | |
if (limit_count === undefined || limit_count > 0) { | |
// If the event matches the current filter: | |
if (match_filter(event, filter)) { | |
// Send the event to the client. | |
this.send(['EVENT', sub_id, event]) | |
this.log.debug('event matched subscription:', sub_id, event.id) | |
} | |
// Update the limit count. | |
if (limit_count !== undefined) limit_count -= 1 | |
} | |
} | |
} | |
// Send an end of subscription event. | |
this.log.debug('sending EOSE for subscription:', sub_id) | |
this.send(['EOSE', sub_id]) | |
} | |
get log () { | |
return { | |
debug : (...msg : any[]) => DEBUG && console.log(`[ debug ][ ${this._pid} ]`, ...msg), | |
info : (...msg : any[]) => VERBOSE && console.log(`[ info ][ ${this._pid} ]`, ...msg), | |
relay : (...msg : any[]) => console.log(`[ relay ][ ${this._pid} ]`, ...msg), | |
} | |
} | |
addSub ( | |
subId : string, | |
...filters : EventFilter[] | |
) { | |
SUBMAP.set(subId, { instance: this, filters }) | |
this._subs.add(subId) | |
} | |
remSub (subId : string) { | |
SUBMAP.delete(subId) | |
this._subs.delete(subId) | |
} | |
send (message : any[]) { | |
this._socket.send(JSON.stringify(message)) | |
} | |
} | |
/* ================ [ Methods ] ================ */ | |
function assert (value : unknown) : asserts value { | |
if (value === false) throw new Error('assertion failed!') | |
} | |
function match_filter ( | |
event : SignedEvent, | |
filter : EventFilter = {} | |
) : boolean { | |
const { authors, ids, kinds, since, until, limit, ...rest } = filter | |
const tag_filters : string[][] = Object.entries(rest) | |
.filter(e => e[0].startsWith('#')) | |
.map(e => [ e[0].slice(1, 2), ...e[1] ]) | |
if (ids !== undefined && !ids.includes(event.id)) { | |
return false | |
} else if (since !== undefined && event.created_at < since) { | |
return false | |
} else if (until !== undefined && event.created_at > until) { | |
return false | |
} else if (authors !== undefined && !authors.includes(event.pubkey)) { | |
return false | |
} else if (kinds !== undefined && !kinds.includes(event.kind)) { | |
return false | |
} else if (tag_filters.length > 0) { | |
return match_tags(tag_filters, event.tags) | |
} else { | |
return true | |
} | |
} | |
function match_tags ( | |
filters : string[][], | |
tags : string[][] | |
) : boolean { | |
for (const [ key, ...terms ] of filters) { | |
for (const [ tag, ...params ] of tags) { | |
if (tag === key) { | |
for (const term of terms) { | |
if (!params.includes(term)) { | |
return false | |
} | |
} | |
} | |
} | |
} | |
return true | |
} | |
function verify_event (event : SignedEvent) { | |
const { content, created_at, id, kind, pubkey, sig, tags } = event | |
const pimg = JSON.stringify([ 0, pubkey, created_at, kind, tags, content ]) | |
const dig = Buffer.from(sha256(pimg)).toString('hex') | |
if (dig !== id) return false | |
return schnorr.verify(sig, id, pubkey) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment