Created
November 17, 2024 20:41
-
-
Save jackwh/cdb22640ff962bc8fa86cf79017ff8cd to your computer and use it in GitHub Desktop.
Mercure subscription manager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { arrayWrap } from "@/Support/Utilities/Arrays.ts"; | |
import { numericHash } from "@/Components/Macros/Utils/NumericHash.ts"; | |
import _ from "lodash"; | |
export default class Mercure { | |
url: string | null; | |
debug = false; | |
hub: EventSource | undefined; | |
lastEventID: string | undefined; | |
hash: number | undefined; | |
isConnecting = false; | |
subscriptions: Map<MercureSubscriptionID, MercureSubscription> = new Map< | |
MercureSubscriptionID, | |
MercureSubscription | |
>(); | |
/** Create a new Mercure instance. */ | |
constructor(hubUrl: string | null, subscriptions: MercureSubscription[] = [], debug = false) { | |
this.url = hubUrl; | |
this.debug = debug; | |
if (subscriptions.length > 0) { | |
this.subscribe(subscriptions); | |
} | |
} | |
/** Subscribe to one or more topics. Returns an array of subscription IDs. */ | |
subscribe(to: MercureSubscription | MercureSubscription[]): MercureSubscriptionID[] { | |
// Prepare records with hashes for each subscription: | |
const hashedSubscriptions = arrayWrap(to).map((subscription) => this.createHashedSubscription(subscription)); | |
// Set each subscription on the Map: | |
hashedSubscriptions.forEach((data) => { | |
this.subscriptions.set(data.hash, data.subscription); | |
}); | |
// Sync any changes to the connection. | |
this.sync(); | |
// Return an array of newly-subscribed subscription IDs. | |
return hashedSubscriptions.map((data) => data.hash); | |
} | |
/** Unsubscribe from one or more topics. */ | |
unsubscribe(from: MercureSubscriptionID | MercureSubscriptionID[]): void { | |
const hashedUnsubscribeIDs = arrayWrap(from); | |
// Remove each unsubscription from the Map: | |
hashedUnsubscribeIDs.forEach((id) => { | |
this.subscriptions.delete(id); | |
}); | |
// Sync any changes back to the connection. | |
this.sync(); | |
} | |
/** Handle an error from Mercure. */ | |
handleError(errorEvent: Event): void { | |
console.error("Mercure error", errorEvent); | |
} | |
/** Handle a message from Mercure. */ | |
handleMessage(messageEvent: MessageEvent): void { | |
// Record the last event ID, so we can catch up on missed events later if needed: | |
this.lastEventID = messageEvent.lastEventId; | |
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment | |
const eventData: MercureWrappedEvent<BroadcastEvent> = JSON.parse(messageEvent.data as string); | |
if (this.debug) { | |
console.log(`Received Mercure message ${this.lastEventID}:`, eventData); | |
} | |
// Check if any of our subscriptions are listening for this event, | |
// and if so, ensure the selectors match before handling the event: | |
for (const [hash, subscription] of this.subscriptions) { | |
if (subscription.event === eventData.event) { | |
if (subscription.selector && subscription.selector !== eventData.data.selector) { | |
if (this.debug) { | |
console.log(`Ignoring event ${eventData.event} (selector mismatch: ${subscription.selector})`); | |
} | |
continue; | |
} | |
if (this.debug) { | |
console.log(`Handling event ${eventData.event} (matched selector: ${subscription.selector})...`); | |
} | |
// The event type and selectors are compatible, so handle the event: | |
subscription.handler(eventData.data.broadcast); | |
} | |
} | |
} | |
/** Compute a hash for a MercureSubscription. */ | |
private createHashedSubscription(subscription: MercureSubscription) { | |
const hash: MercureSubscriptionID = "event" + String(numericHash(JSON.stringify(subscription))); | |
return { hash: hash, subscription: subscription }; | |
} | |
/** If any of the subscription requirements change, reestablish the connection to Mercure. */ | |
private sync() { | |
const oldHash = this.hash; | |
const newHash = numericHash(JSON.stringify(Object.fromEntries(this.subscriptions))); | |
const isDifferent = oldHash !== newHash; | |
// If the hash was changed, reestablish the connection. | |
if (isDifferent) { | |
this.hash = newHash; | |
// Debounce the connection setup, in case multiple components update subscriptions in rapid succession: | |
if (!this.isConnecting) { | |
this.isConnecting = true; | |
const connectToMercure = _.debounce(() => this.connect(), 500); | |
connectToMercure(); | |
} | |
} | |
} | |
/** Connect to Mercure. */ | |
private connect() { | |
// If we haven't got any subscriptions to listen for, disconnect and return early. | |
if (this.subscriptions.size === 0) { | |
this.disconnect(); | |
this.isConnecting = false; | |
return; | |
} | |
// Ensure we haven't accidentally left an existing connection open before continuing. | |
if (this.hub !== undefined) { | |
this.disconnect(true); | |
} | |
// Build a URL to the Mercure Hub, with subscription selectors for all required topics: | |
const hubUrl = this.url ?? window.mercureDefaults?.hubUrl; | |
if (!hubUrl) { | |
throw new Error("No Mercure Hub URL provided."); | |
} | |
const connectionUrl = new URL(hubUrl); | |
for (const topic of this.topics) { | |
connectionUrl.searchParams.append("topic", topic); | |
} | |
// If we're resuming from a previous connection, include the lastEventID: | |
if (this.lastEventID !== undefined) { | |
connectionUrl.searchParams.append("lastEventID", this.lastEventID); | |
} | |
// Create an EventSource and associate it with this instance: | |
this.hub = new EventSource(connectionUrl, { | |
withCredentials: true, | |
}); | |
// Attach our handlers to the EventSource: | |
this.hub.onmessage = (e) => this.handleMessage(e); | |
this.hub.onerror = (e) => this.handleError(e); | |
// Mark the debounced connection state as complete. | |
this.isConnecting = false; | |
} | |
/** Disconnect from Mercure. If 'temporarily' is set, the last event ID will be retained for later reconnections. */ | |
disconnect(temporarily = false) { | |
// If the disconnection is permanent, clear out the last event ID. | |
if (!temporarily) { | |
this.lastEventID = undefined; | |
} | |
// If we haven't established a connection yet, return early. | |
if (this.hub === undefined) { | |
return; | |
} | |
// Close the connection and remove the attached callbacks. | |
this.hub.close(); | |
this.hub.onmessage = null; | |
this.hub.onerror = null; | |
this.hub.onopen = null; | |
} | |
/** Get the current state of the Mercure connection. */ | |
get state(): MercureConnectionState { | |
if (this.hub === undefined) { | |
return undefined; | |
} | |
const hubState = this.hub.readyState; | |
if (hubState === 0) { | |
return "connecting"; | |
} else if (hubState === 1) { | |
return "open"; | |
} else if (hubState === 2) { | |
return "closed"; | |
} | |
throw new Error("Unexpected EventSource state: " + hubState); | |
} | |
/** Get an array of topic URLs from the instance subscriptions. */ | |
get topics(): string[] { | |
return [ | |
...new Set( | |
Array.from(this.subscriptions.values()) | |
.filter((subscription) => (subscription.topic ?? undefined) !== undefined) | |
.map((subscription) => subscription.topic!) | |
.concat(window.mercureDefaults?.topics ?? []) | |
.toSorted(), | |
), | |
]; | |
} | |
} | |
/** Listen for an event broadcast by Mercure to React. Pass the returned callback to a useEffect hook to safely unsubscribe. */ | |
export function registerMercureSubscription(subscription: MercureSubscription): () => void { | |
// Find the current Mercure instance on the window. | |
const instance = window.Mercure; | |
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions,@typescript-eslint/no-unnecessary-condition | |
if (!instance) { | |
throw new Error("Cannot add subscription, no Mercure instance detected."); | |
} | |
// Create the subscription: | |
const subscriptionID = instance.subscribe(subscription); | |
// Return a callback so React can clean up the subscription when needed: | |
return () => { | |
window.Mercure.unsubscribe(subscriptionID); | |
}; | |
} | |
export type BroadcastEvent = | |
| "App\\Broadcasting\\Events\\ExampleEvent" | |
| "App\\Broadcasting\\Events\\AnotherExampleEvent"; | |
export type MercureEventPayload<T> = T; | |
export interface WrappedBroadcast<T> { | |
selector: string | null; | |
broadcast: MercureEventPayload<T>; | |
} | |
export interface MercureWrappedEvent<T> { | |
event: BroadcastEvent; | |
data: WrappedBroadcast<T>; | |
} | |
export type MercureConnectionState = "connecting" | "open" | "closed" | undefined; | |
// eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents | |
export type MercureBroadcastSelector = string | BroadcastEvent | null; | |
export type MercureSubscriptionID = string; | |
export interface MercureSubscription { | |
topic?: string; | |
event: BroadcastEvent; | |
selector: MercureBroadcastSelector; | |
// eslint-disable-next-line @typescript-eslint/no-explicit-any | |
handler: (eventData: MercureEventPayload<any>) => void; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment