Skip to content

Instantly share code, notes, and snippets.

@jackwh
Created November 17, 2024 20:41
Show Gist options
  • Save jackwh/cdb22640ff962bc8fa86cf79017ff8cd to your computer and use it in GitHub Desktop.
Save jackwh/cdb22640ff962bc8fa86cf79017ff8cd to your computer and use it in GitHub Desktop.
Mercure subscription manager
import { arrayWrap } from "@/Support/Utilities/Arrays.ts";
import { numericHash } from "@/Components/Macros/Utils/NumericHash.ts";
import _ from "lodash";
export default class Mercure {
url: string | null;
debug = false;
hub: EventSource | undefined;
lastEventID: string | undefined;
hash: number | undefined;
isConnecting = false;
subscriptions: Map<MercureSubscriptionID, MercureSubscription> = new Map<
MercureSubscriptionID,
MercureSubscription
>();
/** Create a new Mercure instance. */
constructor(hubUrl: string | null, subscriptions: MercureSubscription[] = [], debug = false) {
this.url = hubUrl;
this.debug = debug;
if (subscriptions.length > 0) {
this.subscribe(subscriptions);
}
}
/** Subscribe to one or more topics. Returns an array of subscription IDs. */
subscribe(to: MercureSubscription | MercureSubscription[]): MercureSubscriptionID[] {
// Prepare records with hashes for each subscription:
const hashedSubscriptions = arrayWrap(to).map((subscription) => this.createHashedSubscription(subscription));
// Set each subscription on the Map:
hashedSubscriptions.forEach((data) => {
this.subscriptions.set(data.hash, data.subscription);
});
// Sync any changes to the connection.
this.sync();
// Return an array of newly-subscribed subscription IDs.
return hashedSubscriptions.map((data) => data.hash);
}
/** Unsubscribe from one or more topics. */
unsubscribe(from: MercureSubscriptionID | MercureSubscriptionID[]): void {
const hashedUnsubscribeIDs = arrayWrap(from);
// Remove each unsubscription from the Map:
hashedUnsubscribeIDs.forEach((id) => {
this.subscriptions.delete(id);
});
// Sync any changes back to the connection.
this.sync();
}
/** Handle an error from Mercure. */
handleError(errorEvent: Event): void {
console.error("Mercure error", errorEvent);
}
/** Handle a message from Mercure. */
handleMessage(messageEvent: MessageEvent): void {
// Record the last event ID, so we can catch up on missed events later if needed:
this.lastEventID = messageEvent.lastEventId;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const eventData: MercureWrappedEvent<BroadcastEvent> = JSON.parse(messageEvent.data as string);
if (this.debug) {
console.log(`Received Mercure message ${this.lastEventID}:`, eventData);
}
// Check if any of our subscriptions are listening for this event,
// and if so, ensure the selectors match before handling the event:
for (const [hash, subscription] of this.subscriptions) {
if (subscription.event === eventData.event) {
if (subscription.selector && subscription.selector !== eventData.data.selector) {
if (this.debug) {
console.log(`Ignoring event ${eventData.event} (selector mismatch: ${subscription.selector})`);
}
continue;
}
if (this.debug) {
console.log(`Handling event ${eventData.event} (matched selector: ${subscription.selector})...`);
}
// The event type and selectors are compatible, so handle the event:
subscription.handler(eventData.data.broadcast);
}
}
}
/** Compute a hash for a MercureSubscription. */
private createHashedSubscription(subscription: MercureSubscription) {
const hash: MercureSubscriptionID = "event" + String(numericHash(JSON.stringify(subscription)));
return { hash: hash, subscription: subscription };
}
/** If any of the subscription requirements change, reestablish the connection to Mercure. */
private sync() {
const oldHash = this.hash;
const newHash = numericHash(JSON.stringify(Object.fromEntries(this.subscriptions)));
const isDifferent = oldHash !== newHash;
// If the hash was changed, reestablish the connection.
if (isDifferent) {
this.hash = newHash;
// Debounce the connection setup, in case multiple components update subscriptions in rapid succession:
if (!this.isConnecting) {
this.isConnecting = true;
const connectToMercure = _.debounce(() => this.connect(), 500);
connectToMercure();
}
}
}
/** Connect to Mercure. */
private connect() {
// If we haven't got any subscriptions to listen for, disconnect and return early.
if (this.subscriptions.size === 0) {
this.disconnect();
this.isConnecting = false;
return;
}
// Ensure we haven't accidentally left an existing connection open before continuing.
if (this.hub !== undefined) {
this.disconnect(true);
}
// Build a URL to the Mercure Hub, with subscription selectors for all required topics:
const hubUrl = this.url ?? window.mercureDefaults?.hubUrl;
if (!hubUrl) {
throw new Error("No Mercure Hub URL provided.");
}
const connectionUrl = new URL(hubUrl);
for (const topic of this.topics) {
connectionUrl.searchParams.append("topic", topic);
}
// If we're resuming from a previous connection, include the lastEventID:
if (this.lastEventID !== undefined) {
connectionUrl.searchParams.append("lastEventID", this.lastEventID);
}
// Create an EventSource and associate it with this instance:
this.hub = new EventSource(connectionUrl, {
withCredentials: true,
});
// Attach our handlers to the EventSource:
this.hub.onmessage = (e) => this.handleMessage(e);
this.hub.onerror = (e) => this.handleError(e);
// Mark the debounced connection state as complete.
this.isConnecting = false;
}
/** Disconnect from Mercure. If 'temporarily' is set, the last event ID will be retained for later reconnections. */
disconnect(temporarily = false) {
// If the disconnection is permanent, clear out the last event ID.
if (!temporarily) {
this.lastEventID = undefined;
}
// If we haven't established a connection yet, return early.
if (this.hub === undefined) {
return;
}
// Close the connection and remove the attached callbacks.
this.hub.close();
this.hub.onmessage = null;
this.hub.onerror = null;
this.hub.onopen = null;
}
/** Get the current state of the Mercure connection. */
get state(): MercureConnectionState {
if (this.hub === undefined) {
return undefined;
}
const hubState = this.hub.readyState;
if (hubState === 0) {
return "connecting";
} else if (hubState === 1) {
return "open";
} else if (hubState === 2) {
return "closed";
}
throw new Error("Unexpected EventSource state: " + hubState);
}
/** Get an array of topic URLs from the instance subscriptions. */
get topics(): string[] {
return [
...new Set(
Array.from(this.subscriptions.values())
.filter((subscription) => (subscription.topic ?? undefined) !== undefined)
.map((subscription) => subscription.topic!)
.concat(window.mercureDefaults?.topics ?? [])
.toSorted(),
),
];
}
}
/** Listen for an event broadcast by Mercure to React. Pass the returned callback to a useEffect hook to safely unsubscribe. */
export function registerMercureSubscription(subscription: MercureSubscription): () => void {
// Find the current Mercure instance on the window.
const instance = window.Mercure;
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions,@typescript-eslint/no-unnecessary-condition
if (!instance) {
throw new Error("Cannot add subscription, no Mercure instance detected.");
}
// Create the subscription:
const subscriptionID = instance.subscribe(subscription);
// Return a callback so React can clean up the subscription when needed:
return () => {
window.Mercure.unsubscribe(subscriptionID);
};
}
export type BroadcastEvent =
| "App\\Broadcasting\\Events\\ExampleEvent"
| "App\\Broadcasting\\Events\\AnotherExampleEvent";
export type MercureEventPayload<T> = T;
export interface WrappedBroadcast<T> {
selector: string | null;
broadcast: MercureEventPayload<T>;
}
export interface MercureWrappedEvent<T> {
event: BroadcastEvent;
data: WrappedBroadcast<T>;
}
export type MercureConnectionState = "connecting" | "open" | "closed" | undefined;
// eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents
export type MercureBroadcastSelector = string | BroadcastEvent | null;
export type MercureSubscriptionID = string;
export interface MercureSubscription {
topic?: string;
event: BroadcastEvent;
selector: MercureBroadcastSelector;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handler: (eventData: MercureEventPayload<any>) => void;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment