Skip to content

Instantly share code, notes, and snippets.

@marinho
Created October 7, 2016 15:45
Show Gist options
  • Save marinho/3637210b13c0f298e1692a0b7b104e64 to your computer and use it in GitHub Desktop.
Save marinho/3637210b13c0f298e1692a0b7b104e64 to your computer and use it in GitHub Desktop.
RxJS operator to delay before disposal when refCount reaches zero
import { Observable } from 'rxjs/Observable';
import { ConnectableObservable } from 'rxjs/observable/ConnectableObservable';
import { Subscription } from 'rxjs/Subscription';
import { async } from 'rxjs/scheduler/async';
/* This operator makes more sense when used with .publishReplay(1), as example below:
*
* let some$ = Observable.create(observer => {
* console.log(1110);
* observer.next('a value');
* });
* this.obs$ = refCountWithDelay(some$.publishReplay(1), 0, 2000);
*
* in such case above, '1110' will print only 2000 ms after all subscriptions
* were unsubscribed.
*
* authors: https://github.com/Dorus and https://github.com/babeal
*/
export function refCountWithDelay<T>(source: ConnectableObservable<T>, attachTime?: number, detachTime?: number, scheduler?) {
attachTime = attachTime || 0;
detachTime = detachTime || 0;
scheduler = scheduler || async;
var connected = 0; // 0 = disconnected, 1 = disconnecting, 2 = connecting, 3 = connected
var refCount = 0;
var con = Subscription.EMPTY;
var sched = Subscription.EMPTY;
return Observable.create(ob => {
source.subscribe(ob);
if (refCount++ === 0) {
if (connected === 1) {
connected = 3;
sched.unsubscribe();
} else { // connected === 0
connected = 2;
sched = scheduler.schedule(() => {
con = source.connect();
connected = 3;
}, attachTime);
}
}
return () => {
if (--refCount === 0) {
if (connected === 2) {
connected = 0;
sched.unsubscribe();
} else { // connected === 3
connected = 1;
sched = scheduler.schedule(() => { con.unsubscribe(); connected = 0; }, detachTime)
}
}
};
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment