Skip to content

Instantly share code, notes, and snippets.

@cmdruid
Last active November 14, 2024 00:41
Show Gist options
  • Save cmdruid/43159fd102849147132875fee15db7ec to your computer and use it in GitHub Desktop.
Save cmdruid/43159fd102849147132875fee15db7ec to your computer and use it in GitHub Desktop.
Shameless copy of https://github.com/coracle-social/bucket, updated to ESM with types, validation and minimal dependencies. (<300 LOC)
import { z } from 'zod'
import { schnorr } from '@noble/curves/secp256k1'
import { sha256 } from '@noble/hashes/sha256'
import { WebSocket, WebSocketServer } from 'ws'
/* ================ [ Configuration ] ================ */
const SERVER_PORT = 8002 // Port to use for the server.
const PURGE_IVAL = 600 // Interval to purge events (in seconds).
const DEBUG = process.env['DEBUG'] === 'true'
const VERBOSE = process.env['VERBOSE'] === 'true' || DEBUG
/* ================ [ Interfaces ] ================ */
interface EventFilter {
ids ?: string[]
authors ?: string[]
kinds ?: number[]
since ?: number
until ?: number
limit ?: number
[ key : string ] : any | undefined
}
interface SignedEvent {
content : string
created_at : number
id : string
kind : number
pubkey : string
sig : string
tags : string[][]
}
interface Subscription {
instance : Instance,
filters : EventFilter[]
}
/* ================ [ Schema ] ================ */
const num = z.number().max(Number.MAX_SAFE_INTEGER),
str = z.string(),
stamp = num.min(500_000_000),
hex = str.regex(/^[0-9a-fA-F]*$/).refine(e => e.length % 2 === 0),
hash = hex.refine((e) => e.length === 64),
sig = hex.refine((e) => e.length === 128),
tags = str.array()
const event_schema = z.object({
content : str,
created_at : stamp,
id : hash,
kind : num,
pubkey : hash,
sig : sig,
tags : tags.array()
})
const filter_schema = z.object({
ids : hash.array().optional(),
authors : hash.array().optional(),
kinds : num.array().optional(),
since : stamp.optional(),
until : stamp.optional(),
limit : num.optional(),
}).catchall(tags)
const sub_schema = z.tuple([ str ]).rest(filter_schema)
/* ================ [ Server Init ] ================ */
// Stores the subscriptions for each connection.
const SUBMAP : Map<string, Subscription> = new Map()
// Stores the events for the relay in memory.
let EVENT_CACHE : SignedEvent[] = []
// Stores the connection count.
let conn_count = 0
// Initialize the websocket server.
const wss = new WebSocketServer({ port: SERVER_PORT })
// Print out the port in use.
console.log('[ relay ] running on port:', SERVER_PORT)
// If set, start the timer for purging events.
if (PURGE_IVAL) {
console.log(`[ relay ] purging events every ${PURGE_IVAL} seconds`)
setInterval(() => {
EVENT_CACHE = []
}, PURGE_IVAL * 1000)
}
/* ================ [ Connection Handler ] ================ */
wss.on('connection', socket => {
const relay = new Instance(socket)
socket.on('message', msg => relay._handler(msg.toString()))
socket.on('error', e => relay._onerr(e))
socket.on('close', code => relay._cleanup(code))
conn_count += 1
})
/* ================ [ Instance Class ] ================ */
class Instance {
private readonly _pid : string
private readonly _socket : WebSocket
private readonly _subs : Set<string>
constructor(socket : WebSocket) {
this._pid = Math.random().toString().slice(2, 8)
this._socket = socket
this._subs = new Set()
this.log.relay('client connected')
}
_cleanup (code : number) {
this._socket.close()
for (const subId of this._subs) {
this.remSub(subId)
}
conn_count -= 1
this.log.relay(`[ ${this._pid} ]`, 'client disconnected with code:', code)
}
_handler (message : string) {
let verb : string, payload : any
try {
[ verb, ...payload ] = JSON.parse(message)
assert(typeof verb === 'string')
switch (verb) {
case 'REQ':
const [ id, ...filters ] = sub_schema.parse(payload)
return this._onreq(id, filters)
case 'EVENT':
const event = event_schema.parse(payload.at(0))
return this._onevent(event)
case 'CLOSE':
const subid = str.parse(payload.at(0))
return this._onclose(subid)
default:
this.log.info('unable to handle message type:', verb)
this.send(['NOTICE', '', 'Unable to handle message'])
}
} catch (e) {
this.log.debug('failed to parse message:\n\n', message)
return this.send(['NOTICE', '', 'Unable to parse message'])
}
}
_onclose (sub_id : string) {
this.log.info('closed subscription:', sub_id)
this.remSub(sub_id)
}
_onerr (err : Error) {
this.log.info('socket encountered an error:\n\n', err)
}
_onevent (event : SignedEvent) {
this.log.relay('received event id:', event.id)
this.log.debug('event:', event)
if (!verify_event(event)) {
this.log.debug('event failed validation:', event)
this.send([ 'OK', event.id, false, 'event failed validation' ])
return
}
this.send([ 'OK', event.id, true, '' ])
EVENT_CACHE = EVENT_CACHE.concat(event).sort((a, b) => a > b ? -1 : 1)
this.send(['OK', event.id, true, ""])
for (const [ sub_id, { instance, filters } ] of SUBMAP.entries()) {
for (const filter of filters) {
if (match_filter(event, filter)) {
this.log.debug('event matched subscription:', sub_id, event.id)
instance.send(['EVENT', sub_id, event])
}
}
}
}
_onreq (
sub_id : string,
filters : EventFilter[]
) : void {
this.log.relay('received subscription request:', sub_id)
this.log.debug('filters:', filters)
// Add the subscription to our set.
this.addSub(sub_id, filters)
// For each filter:
for (const filter of filters) {
// Set the limit count, if any.
let limit_count = filter.limit
// For each event in the cache:
for (const event of EVENT_CACHE) {
// If there is no limit, or we are above the limit:
if (limit_count === undefined || limit_count > 0) {
// If the event matches the current filter:
if (match_filter(event, filter)) {
// Send the event to the client.
this.send(['EVENT', sub_id, event])
this.log.debug('event matched subscription:', sub_id, event.id)
}
// Update the limit count.
if (limit_count !== undefined) limit_count -= 1
}
}
}
// Send an end of subscription event.
this.log.debug('sending EOSE for subscription:', sub_id)
this.send(['EOSE', sub_id])
}
get log () {
return {
debug : (...msg : any[]) => DEBUG && console.log(`[ debug ][ ${this._pid} ]`, ...msg),
info : (...msg : any[]) => VERBOSE && console.log(`[ info ][ ${this._pid} ]`, ...msg),
relay : (...msg : any[]) => console.log(`[ relay ][ ${this._pid} ]`, ...msg),
}
}
addSub (
subId : string,
...filters : EventFilter[]
) {
SUBMAP.set(subId, { instance: this, filters })
this._subs.add(subId)
}
remSub (subId : string) {
SUBMAP.delete(subId)
this._subs.delete(subId)
}
send (message : any[]) {
this._socket.send(JSON.stringify(message))
}
}
/* ================ [ Methods ] ================ */
function assert (value : unknown) : asserts value {
if (value === false) throw new Error('assertion failed!')
}
function match_filter (
event : SignedEvent,
filter : EventFilter = {}
) : boolean {
const { authors, ids, kinds, since, until, limit, ...rest } = filter
const tag_filters : string[][] = Object.entries(rest)
.filter(e => e[0].startsWith('#'))
.map(e => [ e[0].slice(1, 2), ...e[1] ])
if (ids !== undefined && !ids.includes(event.id)) {
return false
} else if (since !== undefined && event.created_at < since) {
return false
} else if (until !== undefined && event.created_at > until) {
return false
} else if (authors !== undefined && !authors.includes(event.pubkey)) {
return false
} else if (kinds !== undefined && !kinds.includes(event.kind)) {
return false
} else if (tag_filters.length > 0) {
return match_tags(tag_filters, event.tags)
} else {
return true
}
}
function match_tags (
filters : string[][],
tags : string[][]
) : boolean {
for (const [ key, ...terms ] of filters) {
for (const [ tag, ...params ] of tags) {
if (tag === key) {
for (const term of terms) {
if (!params.includes(term)) {
return false
}
}
}
}
}
return true
}
function verify_event (event : SignedEvent) {
const { content, created_at, id, kind, pubkey, sig, tags } = event
const pimg = JSON.stringify([ 0, pubkey, created_at, kind, tags, content ])
const dig = Buffer.from(sha256(pimg)).toString('hex')
if (dig !== id) return false
return schnorr.verify(sig, id, pubkey)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment