Skip to content

Instantly share code, notes, and snippets.

@kosich
Created May 21, 2019 08:59
Show Gist options
  • Save kosich/bf7d5c8fcae8f2c5f0129fd4a11b9147 to your computer and use it in GitHub Desktop.
Save kosich/bf7d5c8fcae8f2c5f0129fd4a11b9147 to your computer and use it in GitHub Desktop.
custom operator that counts subscribers
const { rxObserver } = require('api/v0.3');
const { Observable, Subject, of, timer, pipe, defer } = require('rxjs');
const { finalize, takeUntil } = require('rxjs/operators');
const subj$ = new Subject();
const result$ = subj$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);
const subscription1 = result$.pipe(
takeUntil(timer(500))
)
.subscribe(rxObserver());
setTimeout(()=>{
result$.subscribe(rxObserver());
}, 100);
// emit an event
setTimeout(()=>{
subj$.next(1);
}, 250);
setTimeout(()=>{
subj$.complete();
}, 700);
function customOperator(onCountUpdate = noop) {
return function refCountOperatorFunction(source$) {
let counter = 0;
return defer(()=>{
counter++;
onCountUpdate(counter);
return source$;
})
.pipe(
finalize(()=>{
counter--;
onCountUpdate(counter);
})
);
};
}
function noop(){}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment