Skip to content

Instantly share code, notes, and snippets.

@tuan
Created May 3, 2017 05:30
Show Gist options
  • Save tuan/6ba654021c5c228defebfaa42710862f to your computer and use it in GitHub Desktop.
Save tuan/6ba654021c5c228defebfaa42710862f to your computer and use it in GitHub Desktop.
Throttle number of observables
import { Observable, Subject, Subscription } from 'rxjs/Rx';
export type FuncAsync<T> = () => Observable<T>;
type QueuedExecution = () => Subscription;
export class ThrottleAsync<T> {
private queue: QueuedExecution[];
constructor (private max: number) {
}
execute(fnAsync: FuncAsync<T>): Observable<T> {
if (this.queue.length >= this.max ) {
const proxy = new Subject<T>();
this.queue.push(() => fnAsync().subscribe(proxy));
return proxy;
}
return new Observable(observer => {
const sub = fnAsync().subscribe(
value => observer.next(value),
(error: any) => {
observer.error(error);
if (this.queue.length > 0) {
const nextFn = this.queue.shift();
nextFn();
}
},
() => {
observer.complete();
if (this.queue.length > 0) {
const nextFn = this.queue.shift();
nextFn();
}
}
);
return () => sub.unsubscribe();
});
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment