Skip to content

Instantly share code, notes, and snippets.

@alxhub
Created February 1, 2018 15:41
Show Gist options
  • Save alxhub/baea74f6fb4f6b5d7be4d27527b6dbc1 to your computer and use it in GitHub Desktop.
Save alxhub/baea74f6fb4f6b5d7be4d27527b6dbc1 to your computer and use it in GitHub Desktop.
RXJS stream with loading events
// Loading indicator implementation.
interface StreamEvent<T> {
type: 'start' | 'done';
response?: T;
}
// Our starting Observable that will emit with each new Id.
declare let id$: Observable<Id>;
const dataEvent$ = id$.pipe(
concatMap(id => concat(
// For each id, we want to send a new 'start' event immediately.
of({type: 'start'}),
// And follow with the processed stream for this id.
of(id).pipe(
// Make some dependent HTTP requests.
concatMap(id => http.get(`/first/${id}`)),
concatMap(first => http.get(`/second/${first.url}`),
// Process the response a bit (assuming it's an array)
map(res => res.filter(...))
// Now, turn the response into a StreamEvent with type 'done'.
map(response => {type: 'done', response})
)
)
);
// In the consumer, we only want dataWithEvent$ to be subscribed to once.
// But we want to have two independent streams, one for loading true/false,
// the other with actual responses. So this function will split the Observable
// with refCount() immediately before subscribing.
function splitDataEvent$<T>(dataEvent$: Observable<StreamEvent<T>>): {data$: Observable<T>, loading$: Observable<boolean>} {
const refCounted$ = dataEvent$.pipe(refCount());
const data$ = refCounted$.pipe(
filter(event => event.type === 'done'),
map(event => event.response)
);
const loading$ = refCounted$.pipe(
map(event => event.type === 'start')
);
return {data$, loading$};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment