Skip to content

Instantly share code, notes, and snippets.

@fl0wo
Created May 23, 2024 07:58
Show Gist options
  • Save fl0wo/27eb2da13160ec741114dedc5690f7dc to your computer and use it in GitHub Desktop.
Save fl0wo/27eb2da13160ec741114dedc5690f7dc to your computer and use it in GitHub Desktop.
Just created a TopicProvider in NextJs! Looks a bit as a Flutter + Angular style. Combined: Signletone, Context Provider, BehaviorSubject Obtained: 1 single connection per topic, spread across the website for every component that needs it.
/**
This component actually needs the messages from Momento, so it uses useTopic() to subscribe to the stream.
*/
"use client";
import {FC, useEffect, useState} from 'react';
import { TopicItem } from '@gomomento/sdk-web';
import { Subscription } from 'rxjs';
import {useTopic} from "@/app/use/topic-provider";
export const ExampleComponent: FC = () => {
const topic$ = useTopic();
const [items, setItems] = useState<TopicItem[]>([]);
useEffect(() => {
const subscription: Subscription = topic$.subscribe({
next: (item) => {
if (item) {
setItems((prevItems) => [...prevItems, item]);
}
},
error: (err) => console.error('Subscription error:', err),
});
// Cleanup subscription on component unmount
return () => {
subscription.unsubscribe();
};
}, [topic$]);
return (
<div>
<h1>Received Items</h1>
<ul>
{items.map((item, index) => (
<li key={index}>{JSON.stringify(item)}</li>
))}
</ul>
</div>
);
};
export default ExampleComponent;
/**
Finally the singletone topic manager, this is the only part of the code where the actual connection happens for real.
If you keep track of the "[momento] connection established:" logs in the console (client-side logs) you'll see only 1 connection happens even if you move across the components (in a sub-provider order ofc)
So if you keep a Provider in the highest level ever, as soon as you open the website, you'll have 1 momento topic connection
shared everywhere in the code.
If you keep the provider at a lower, nested level, you'll have the connection open/closed every time you engage with that component.
*/
import {Configurations, CredentialProvider, TopicClient, type TopicItem, TopicSubscribe,} from "@gomomento/sdk-web";
import {BehaviorSubject, map} from 'rxjs';
class SingletonTopicManager {
private static instance: SingletonTopicManager;
private topicClient: TopicClient;
private subscription: TopicSubscribe.Subscription | undefined;
private topicSubject: BehaviorSubject<TopicItem | null>;
private constructor(
private cacheName: string,
private topicName: string,
topicToken: string,
onItem: (item: TopicItem) => void,
onError: (
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription,
) => Promise<void>
) {
this.topicClient = new TopicClient({
configuration: Configurations.Browser.v1(),
credentialProvider: CredentialProvider.fromString({
apiKey: topicToken,
})
});
this.topicSubject = new BehaviorSubject<TopicItem | null>(null);
this.subscribeToTopic(onItem, onError)
.then(r => console.log('[momento] connection established:', this.topicName));
}
public static getInstance(
cacheName: string,
topicName: string,
topicToken: string,
onItem: (item: TopicItem) => void,
onError: (
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription,
) => Promise<void>
): SingletonTopicManager {
if (!SingletonTopicManager.instance) {
SingletonTopicManager.instance = new SingletonTopicManager(cacheName, topicName, topicToken, onItem, onError);
}
return SingletonTopicManager.instance;
}
private async subscribeToTopic(
onItem: (item: TopicItem) => void,
onError: (
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription,
) => Promise<void>
) {
try {
const resp = await this.topicClient.subscribe(this.cacheName, this.topicName, {
onItem: (item: TopicItem) => {
onItem(item);
this.topicSubject.next(item);
},
onError: onError,
});
if (resp instanceof TopicSubscribe.Subscription) {
this.subscription = resp;
} else {
throw new Error(`Unable to subscribe to topic: ${resp}`);
}
} catch (error) {
console.error('Subscription error:', error);
}
}
public clearCurrentClient() {
this.subscription?.unsubscribe();
this.subscription = undefined;
}
public getTopicSubject() {
return this.topicSubject
.asObservable()
.pipe(map((item) =>
SingletonTopicManager.decode(
item?.value())
)
);
}
private static decode(val: string | Uint8Array | undefined) {
if (typeof val === 'string') {
return JSON.parse(val);
}
if (val instanceof Uint8Array) {
return JSON.parse(new TextDecoder().decode(val));
}
return val;
}
}
export default SingletonTopicManager;
/**
Place this component where-ever you want in your NextJs application, it will listen for topic messages from momento
*/
"use client";
import React, {useState} from "react";
import {TopicProvider} from "@/app/use/topic-provider";
import ExampleComponent from "@/app/dashboard/components/example-consumer";
export function TestTopicMomento(props: {
cacheName: string,
user: any,
apiKey: string,
}) {
const [items, setItems] = useState<any[]>([])
const onItem = (item: any) => {
console.log('Received item:', item);
setItems((prevItems) => [...prevItems, item]);
}
const onError = async (error: any, subscription: any) => {
console.error('Error:', error);
}
return <div>
<TopicProvider
cacheName={props.cacheName}
topicName={props.user.id}
apiKey={props.apiKey}
onItem={onItem}
onError={onError}>
<ExampleComponent/>
</TopicProvider>
</div>
}
/**
We use this provider to let all the "consumer-components" have the access to the singletone containing the stream of messages across the website.
Thanks to this provider we can group all the connections of the topics (with the same name) into one single connection (per topic)
*/
"use client";
import {createContext, FC, ReactNode, useContext, useEffect, useState} from 'react';
import {TopicItem, TopicSubscribe} from '@gomomento/sdk-web';
import { Observable } from 'rxjs';
import SingletonTopicManager from "@/libs/momento/subscribe";
interface TopicContextType {
topic$: Observable<TopicItem | null>;
}
const TopicContext = createContext<TopicContextType | undefined>(undefined);
interface TopicProviderProps {
cacheName: string;
topicName: string;
apiKey: string;
onItem: (item: TopicItem) => void;
onError: (
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription,
) => Promise<void>;
children: ReactNode;
}
export const TopicProvider: FC<TopicProviderProps> = ({ cacheName,apiKey, topicName, onItem, onError, children }: TopicProviderProps) => {
const [topicManager, setTopicManager] = useState<SingletonTopicManager | null>(null);
useEffect(() => {
const manager = SingletonTopicManager.getInstance(
cacheName,
topicName,
apiKey,
onItem,
onError
);
setTopicManager(manager);
}, [cacheName, topicName,apiKey, onItem, onError]); // the manager changes when one of those changes
if (!topicManager) {
return null; // Or a loading spinner or some placeholder
}
return (
<TopicContext.Provider value={{ topic$: topicManager.getTopicSubject() }}>
{children}
</TopicContext.Provider>
);
};
export const useTopic = () => {
const context = useContext(TopicContext);
if (!context) {
throw new Error('useTopic must be used within a TopicProvider');
}
return context.topic$;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment