Created
November 13, 2024 06:55
-
-
Save monotykamary/b430af6ef13a4e99f5e93e67732f5ae9 to your computer and use it in GitHub Desktop.
RxJS Dataloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { asapScheduler, Observable, ObservableInput, SchedulerLike, Subject } from 'rxjs'; | |
import { bufferTime, first, map, mergeMap, share } from 'rxjs/operators'; | |
export type BatchLoadFn<K = any, V = any> = (keys: K[]) => ObservableInput<V>; | |
export type Options = { | |
bufferTimeSpan?: number | |
bufferCreationInterval?: number | null | |
maxBufferSize?: number | |
scheduler?: SchedulerLike | |
} | |
const createRxDataLoader = <K = any, V = any>(batchLoadFn: BatchLoadFn<K, V>, options?: Options) => { | |
const load$ = new Subject<K>(); | |
const request$ = load$.pipe( | |
bufferTime( | |
options?.bufferTimeSpan ?? 0, | |
options?.bufferCreationInterval ?? null, | |
options?.maxBufferSize || Infinity, | |
options?.scheduler ?? asapScheduler, | |
), | |
map((bufferedKeys) => { | |
const keys = new Set(bufferedKeys); | |
const values = batchLoadFn([...keys]); | |
return { keys, values }; | |
}), | |
share(), | |
); | |
return (key: K) => { | |
const dataload$: typeof request$ = new Observable((observer) => { | |
const subscription = request$.subscribe(observer); | |
load$.next(key); | |
return () => subscription.unsubscribe(); | |
}); | |
return dataload$.pipe( | |
first(({ keys }) => keys.has(key)), | |
mergeMap(({ values }) => values), | |
); | |
}; | |
}; | |
export default createRxDataLoader; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { from, lastValueFrom } from 'rxjs'; | |
import createRxDataLoader from './createRxDataLoader'; | |
// Example 1: Basic User Loading | |
// ----------------------------- | |
interface User { | |
id: number; | |
name: string; | |
email: string; | |
} | |
// Simulate a database call | |
const fetchUsersFromDB = async (ids: number[]): Promise<User[]> => { | |
console.log('Batch loading users:', ids); | |
// Simulate API delay | |
await new Promise(resolve => setTimeout(resolve, 100)); | |
return ids.map(id => ({ | |
id, | |
name: `User ${id}`, | |
email: `user${id}@example.com` | |
})); | |
}; | |
// Create a user loader with 50ms buffer window | |
const userLoader = createRxDataLoader<number, User>( | |
(ids) => from(fetchUsersFromDB(ids)), | |
{ bufferTimeSpan: 50 } | |
); | |
// Example usage of user loader | |
async function demonstrateUserLoader() { | |
console.log('Starting user loader demonstration...'); | |
// Load multiple users in parallel | |
const userPromises = [1, 2, 3, 1, 2].map(id => | |
lastValueFrom(userLoader(id)) | |
); | |
const users = await Promise.all(userPromises); | |
console.log('Loaded users:', users); | |
} | |
// Example 2: Product Loading with Cache Control | |
// ------------------------------------------ | |
interface Product { | |
id: string; | |
name: string; | |
price: number; | |
} | |
const fetchProductsFromAPI = async (ids: string[]): Promise<Product[]> => { | |
console.log('Batch loading products:', ids); | |
await new Promise(resolve => setTimeout(resolve, 150)); | |
return ids.map(id => ({ | |
id, | |
name: `Product ${id}`, | |
price: Math.random() * 100 | |
})); | |
}; | |
// Create a product loader with larger buffer window and size limit | |
const productLoader = createRxDataLoader<string, Product>( | |
(ids) => from(fetchProductsFromAPI(ids)), | |
{ | |
bufferTimeSpan: 100, | |
maxBufferSize: 5 | |
} | |
); | |
// Example usage of product loader | |
async function demonstrateProductLoader() { | |
console.log('\nStarting product loader demonstration...'); | |
// Simulate requests coming in at different times | |
const loadProduct = async (id: string, delay: number) => { | |
await new Promise(resolve => setTimeout(resolve, delay)); | |
return lastValueFrom(productLoader(id)); | |
}; | |
const productPromises = [ | |
loadProduct('A', 0), | |
loadProduct('B', 20), | |
loadProduct('C', 40), | |
loadProduct('D', 60), | |
loadProduct('E', 80), | |
loadProduct('F', 150), // This will be in a separate batch due to maxBufferSize | |
]; | |
const products = await Promise.all(productPromises); | |
console.log('Loaded products:', products); | |
} | |
// Example 3: Error Handling | |
// ------------------------ | |
interface Post { | |
id: number; | |
title: string; | |
} | |
const fetchPostsWithErrors = async (ids: number[]): Promise<Post[]> => { | |
console.log('Batch loading posts:', ids); | |
await new Promise(resolve => setTimeout(resolve, 100)); | |
// Simulate some errors | |
if (ids.includes(404)) { | |
throw new Error('Post not found'); | |
} | |
return ids.map(id => ({ | |
id, | |
title: `Post ${id}` | |
})); | |
}; | |
const postLoader = createRxDataLoader<number, Post>( | |
(ids) => from(fetchPostsWithErrors(ids)), | |
{ bufferTimeSpan: 50 } | |
); | |
// Example usage with error handling | |
async function demonstrateErrorHandling() { | |
console.log('\nStarting error handling demonstration...'); | |
try { | |
const posts = await Promise.all([ | |
lastValueFrom(postLoader(1)), | |
lastValueFrom(postLoader(2)), | |
lastValueFrom(postLoader(404)), // This will cause an error | |
]); | |
console.log('Loaded posts:', posts); | |
} catch (error) { | |
console.error('Error loading posts:', error); | |
} | |
} | |
// Run all demonstrations | |
async function runDemonstrations() { | |
await demonstrateUserLoader(); | |
await demonstrateProductLoader(); | |
await demonstrateErrorHandling(); | |
} | |
// Execute the demonstrations | |
runDemonstrations().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment