Last active
May 23, 2019 11:27
-
-
Save kosich/5a7a7f1338737e452ff6a1937b5fe05a to your computer and use it in GitHub Desktop.
Wrap source emission only if its not empty
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { rxObserver } = require('api/v0.3'); | |
const { timer, of, Notification, EMPTY, merge, concat, throwError } = require('rxjs'); | |
const { switchMap, materialize, dematerialize, delay, map } = require('rxjs/operators'); | |
const source$ = merge(timer(5), timer(10)); | |
// Uncomment next line to get an empty source | |
// const source$ = EMPTY; | |
// Uncomment next line to get a source with an error | |
// const source$ = concat(timer(5), throwError('Err')); | |
const result$ = source$.pipe( | |
// turn all events on stream into Notifications | |
materialize(), | |
// wrap elements only if they are present | |
switchMap((event, index) => { | |
// if its first event and its a value | |
if (index === 0 && event.kind === 'N') { | |
const startingNotification = new Notification('N', '>>>', undefined); | |
return of(startingNotification, event); | |
} | |
// if its a completion event and it not a first event | |
if (index > 0 && event.kind === 'C') { | |
const endingNotification = new Notification('N', '<<<', undefined); | |
return of(endingNotification, event); | |
} | |
return of(event); | |
}), | |
// turn Notifications back to events on stream | |
dematerialize() | |
); | |
source$.subscribe(rxObserver('source$')); | |
result$.subscribe(rxObserver('result$')); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment