Skip to content

Instantly share code, notes, and snippets.

@monotykamary
Created November 13, 2024 06:55
Show Gist options
  • Save monotykamary/b430af6ef13a4e99f5e93e67732f5ae9 to your computer and use it in GitHub Desktop.
Save monotykamary/b430af6ef13a4e99f5e93e67732f5ae9 to your computer and use it in GitHub Desktop.
RxJS Dataloader
import { asapScheduler, Observable, ObservableInput, SchedulerLike, Subject } from 'rxjs';
import { bufferTime, first, map, mergeMap, share } from 'rxjs/operators';
export type BatchLoadFn<K = any, V = any> = (keys: K[]) => ObservableInput<V>;
export type Options = {
bufferTimeSpan?: number
bufferCreationInterval?: number | null
maxBufferSize?: number
scheduler?: SchedulerLike
}
const createRxDataLoader = <K = any, V = any>(batchLoadFn: BatchLoadFn<K, V>, options?: Options) => {
const load$ = new Subject<K>();
const request$ = load$.pipe(
bufferTime(
options?.bufferTimeSpan ?? 0,
options?.bufferCreationInterval ?? null,
options?.maxBufferSize || Infinity,
options?.scheduler ?? asapScheduler,
),
map((bufferedKeys) => {
const keys = new Set(bufferedKeys);
const values = batchLoadFn([...keys]);
return { keys, values };
}),
share(),
);
return (key: K) => {
const dataload$: typeof request$ = new Observable((observer) => {
const subscription = request$.subscribe(observer);
load$.next(key);
return () => subscription.unsubscribe();
});
return dataload$.pipe(
first(({ keys }) => keys.has(key)),
mergeMap(({ values }) => values),
);
};
};
export default createRxDataLoader;
import { from, lastValueFrom } from 'rxjs';
import createRxDataLoader from './createRxDataLoader';
// Example 1: Basic User Loading
// -----------------------------
interface User {
id: number;
name: string;
email: string;
}
// Simulate a database call
const fetchUsersFromDB = async (ids: number[]): Promise<User[]> => {
console.log('Batch loading users:', ids);
// Simulate API delay
await new Promise(resolve => setTimeout(resolve, 100));
return ids.map(id => ({
id,
name: `User ${id}`,
email: `user${id}@example.com`
}));
};
// Create a user loader with 50ms buffer window
const userLoader = createRxDataLoader<number, User>(
(ids) => from(fetchUsersFromDB(ids)),
{ bufferTimeSpan: 50 }
);
// Example usage of user loader
async function demonstrateUserLoader() {
console.log('Starting user loader demonstration...');
// Load multiple users in parallel
const userPromises = [1, 2, 3, 1, 2].map(id =>
lastValueFrom(userLoader(id))
);
const users = await Promise.all(userPromises);
console.log('Loaded users:', users);
}
// Example 2: Product Loading with Cache Control
// ------------------------------------------
interface Product {
id: string;
name: string;
price: number;
}
const fetchProductsFromAPI = async (ids: string[]): Promise<Product[]> => {
console.log('Batch loading products:', ids);
await new Promise(resolve => setTimeout(resolve, 150));
return ids.map(id => ({
id,
name: `Product ${id}`,
price: Math.random() * 100
}));
};
// Create a product loader with larger buffer window and size limit
const productLoader = createRxDataLoader<string, Product>(
(ids) => from(fetchProductsFromAPI(ids)),
{
bufferTimeSpan: 100,
maxBufferSize: 5
}
);
// Example usage of product loader
async function demonstrateProductLoader() {
console.log('\nStarting product loader demonstration...');
// Simulate requests coming in at different times
const loadProduct = async (id: string, delay: number) => {
await new Promise(resolve => setTimeout(resolve, delay));
return lastValueFrom(productLoader(id));
};
const productPromises = [
loadProduct('A', 0),
loadProduct('B', 20),
loadProduct('C', 40),
loadProduct('D', 60),
loadProduct('E', 80),
loadProduct('F', 150), // This will be in a separate batch due to maxBufferSize
];
const products = await Promise.all(productPromises);
console.log('Loaded products:', products);
}
// Example 3: Error Handling
// ------------------------
interface Post {
id: number;
title: string;
}
const fetchPostsWithErrors = async (ids: number[]): Promise<Post[]> => {
console.log('Batch loading posts:', ids);
await new Promise(resolve => setTimeout(resolve, 100));
// Simulate some errors
if (ids.includes(404)) {
throw new Error('Post not found');
}
return ids.map(id => ({
id,
title: `Post ${id}`
}));
};
const postLoader = createRxDataLoader<number, Post>(
(ids) => from(fetchPostsWithErrors(ids)),
{ bufferTimeSpan: 50 }
);
// Example usage with error handling
async function demonstrateErrorHandling() {
console.log('\nStarting error handling demonstration...');
try {
const posts = await Promise.all([
lastValueFrom(postLoader(1)),
lastValueFrom(postLoader(2)),
lastValueFrom(postLoader(404)), // This will cause an error
]);
console.log('Loaded posts:', posts);
} catch (error) {
console.error('Error loading posts:', error);
}
}
// Run all demonstrations
async function runDemonstrations() {
await demonstrateUserLoader();
await demonstrateProductLoader();
await demonstrateErrorHandling();
}
// Execute the demonstrations
runDemonstrations().catch(console.error);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment