Skip to content

Instantly share code, notes, and snippets.

@kosich
Last active May 23, 2019 11:27
Show Gist options
  • Save kosich/5a7a7f1338737e452ff6a1937b5fe05a to your computer and use it in GitHub Desktop.
Save kosich/5a7a7f1338737e452ff6a1937b5fe05a to your computer and use it in GitHub Desktop.
Wrap source emission only if its not empty
const { rxObserver } = require('api/v0.3');
const { timer, of, Notification, EMPTY, merge, concat, throwError } = require('rxjs');
const { switchMap, materialize, dematerialize, delay, map } = require('rxjs/operators');
const source$ = merge(timer(5), timer(10));
// Uncomment next line to get an empty source
// const source$ = EMPTY;
// Uncomment next line to get a source with an error
// const source$ = concat(timer(5), throwError('Err'));
const result$ = source$.pipe(
// turn all events on stream into Notifications
materialize(),
// wrap elements only if they are present
switchMap((event, index) => {
// if its first event and its a value
if (index === 0 && event.kind === 'N') {
const startingNotification = new Notification('N', '>>>', undefined);
return of(startingNotification, event);
}
// if its a completion event and it not a first event
if (index > 0 && event.kind === 'C') {
const endingNotification = new Notification('N', '<<<', undefined);
return of(endingNotification, event);
}
return of(event);
}),
// turn Notifications back to events on stream
dematerialize()
);
source$.subscribe(rxObserver('source$'));
result$.subscribe(rxObserver('result$'));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment