Skip to content

Instantly share code, notes, and snippets.

@cartant
Created May 13, 2018 04:30
Show Gist options
  • Select an option

  • Save cartant/3232d6c2a4ff301c3846989a33bd46a7 to your computer and use it in GitHub Desktop.

Select an option

Save cartant/3232d6c2a4ff301c3846989a33bd46a7 to your computer and use it in GitHub Desktop.
import { ConnectableObservable, Observable, Observer, Subject, Subscription, zip } from "rxjs";
import { first, map, publish } from "rxjs/operators";
export class NotificationQueue extends Observable<number> {
private _count = 0;
private _indices: Subject<number>;
private _notifications: ConnectableObservable<number>;
constructor(notifier: Observable<any>) {
super((observer: Observer<number>) => {
const index = this._count++;
const subscription = this._notifications.pipe(
first(value => value === index)
).subscribe(observer);
this._indices.next(index);
return subscription;
});
this._indices = new Subject<number>();
this._notifications = zip(notifier, this._indices).pipe(
map(([, index]) => index),
publish()
) as ConnectableObservable<number>;
}
connect(): Subscription {
return this._notifications.connect();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment