Skip to content

Instantly share code, notes, and snippets.

@lightningspirit
Created April 29, 2022 16:55
Show Gist options
  • Save lightningspirit/2a2e7c6e542831d481a52edd775ad893 to your computer and use it in GitHub Desktop.
Save lightningspirit/2a2e7c6e542831d481a52edd775ad893 to your computer and use it in GitHub Desktop.
A distributed event system for Node.js using Redis as storage system
import { createClient } from "redis"
type Listener = <E extends Event>(event: E) => void
/**
* Atomic distributed memory accross nodes
* backed by Redis server.
*/
class DistributedMemory {
/**
* ReturnType<typeof createClient>
*/
private publisher: ReturnType<typeof createClient>
/**
* ReturnType<typeof createClient>
*/
private subscriber: ReturnType<typeof createClient>
/**
* Listeners
*/
private listeners: Record<string, Listener[]>
/**
* Creates
*/
constructor() {
this.publisher = createClient()
this.subscriber = this.publisher.duplicate()
this.listeners = {}
}
/**
* Get a field from memory
* @param namespace
* @param key
* @returns
*/
public async get<N extends string, K extends string, R>(
namespace: N,
key: K,
) {
const result = await this.publisher.hGet(namespace, key)
return result ? (JSON.parse(result) as R) : null
}
/**
* Sets a field to the distributed memory
* @param namespace
* @param key
* @param value
* @returns
*/
public async set<N extends string, K extends string, V, R>(
namespace: N,
key: K,
value: V,
) {
const serialized = JSON.stringify(value)
return !!(await this.publisher.hSet(namespace, key, serialized))
}
/**
* Checks if field exists in the distributed memory
* @param namespace
* @param key
* @returns
*/
public async has<N extends string, K extends string>(namespace: N, key: K) {
return !!(await this.publisher.hExists(namespace, key))
}
/**
* Sends a node event
* @param eventName
* @param event
* @returns
*/
public async emit<N extends string, E extends Event>(eventName: N, event: E) {
return !!(await this.publisher.publish(eventName, JSON.stringify(event)))
}
/**
* Listens for a node event
* @param eventName
* @returns
*/
public on<N extends string, E extends Event>(
eventName: N,
callback: Listener,
) {
if (!(eventName in this.listeners)) {
this.listeners[eventName] = []
}
this.listeners[eventName].push(callback)
this.subscriber.subscribe(eventName, (message) => {
const event = JSON.parse(message) as E
this.listeners[eventName].forEach((callback) => callback(event))
})
return this
}
/**
* Removes listener
* @param eventName
* @returns
*/
public off<N extends string>(eventName: N, callback: Listener) {
if (!(eventName in this.listeners)) {
return this
}
this.listeners[eventName] = this.listeners[eventName].filter((fn) => {
return !(fn === callback)
})
if (this.listeners[eventName].length < 1) {
this.subscriber.unsubscribe(eventName)
}
return this
}
}
export default DistributedMemory
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment