Created
April 29, 2022 16:55
-
-
Save lightningspirit/2a2e7c6e542831d481a52edd775ad893 to your computer and use it in GitHub Desktop.
A distributed event system for Node.js using Redis as storage system
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { createClient } from "redis" | |
type Listener = <E extends Event>(event: E) => void | |
/** | |
* Atomic distributed memory accross nodes | |
* backed by Redis server. | |
*/ | |
class DistributedMemory { | |
/** | |
* ReturnType<typeof createClient> | |
*/ | |
private publisher: ReturnType<typeof createClient> | |
/** | |
* ReturnType<typeof createClient> | |
*/ | |
private subscriber: ReturnType<typeof createClient> | |
/** | |
* Listeners | |
*/ | |
private listeners: Record<string, Listener[]> | |
/** | |
* Creates | |
*/ | |
constructor() { | |
this.publisher = createClient() | |
this.subscriber = this.publisher.duplicate() | |
this.listeners = {} | |
} | |
/** | |
* Get a field from memory | |
* @param namespace | |
* @param key | |
* @returns | |
*/ | |
public async get<N extends string, K extends string, R>( | |
namespace: N, | |
key: K, | |
) { | |
const result = await this.publisher.hGet(namespace, key) | |
return result ? (JSON.parse(result) as R) : null | |
} | |
/** | |
* Sets a field to the distributed memory | |
* @param namespace | |
* @param key | |
* @param value | |
* @returns | |
*/ | |
public async set<N extends string, K extends string, V, R>( | |
namespace: N, | |
key: K, | |
value: V, | |
) { | |
const serialized = JSON.stringify(value) | |
return !!(await this.publisher.hSet(namespace, key, serialized)) | |
} | |
/** | |
* Checks if field exists in the distributed memory | |
* @param namespace | |
* @param key | |
* @returns | |
*/ | |
public async has<N extends string, K extends string>(namespace: N, key: K) { | |
return !!(await this.publisher.hExists(namespace, key)) | |
} | |
/** | |
* Sends a node event | |
* @param eventName | |
* @param event | |
* @returns | |
*/ | |
public async emit<N extends string, E extends Event>(eventName: N, event: E) { | |
return !!(await this.publisher.publish(eventName, JSON.stringify(event))) | |
} | |
/** | |
* Listens for a node event | |
* @param eventName | |
* @returns | |
*/ | |
public on<N extends string, E extends Event>( | |
eventName: N, | |
callback: Listener, | |
) { | |
if (!(eventName in this.listeners)) { | |
this.listeners[eventName] = [] | |
} | |
this.listeners[eventName].push(callback) | |
this.subscriber.subscribe(eventName, (message) => { | |
const event = JSON.parse(message) as E | |
this.listeners[eventName].forEach((callback) => callback(event)) | |
}) | |
return this | |
} | |
/** | |
* Removes listener | |
* @param eventName | |
* @returns | |
*/ | |
public off<N extends string>(eventName: N, callback: Listener) { | |
if (!(eventName in this.listeners)) { | |
return this | |
} | |
this.listeners[eventName] = this.listeners[eventName].filter((fn) => { | |
return !(fn === callback) | |
}) | |
if (this.listeners[eventName].length < 1) { | |
this.subscriber.unsubscribe(eventName) | |
} | |
return this | |
} | |
} | |
export default DistributedMemory |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment