Skip to content

Instantly share code, notes, and snippets.

@nilsmehlhorn
Last active February 17, 2020 02:53
Show Gist options
  • Save nilsmehlhorn/f7ab3137939d5faff6ec9927d4c63455 to your computer and use it in GitHub Desktop.
Save nilsmehlhorn/f7ab3137939d5faff6ec9927d4c63455 to your computer and use it in GitHub Desktop.
Miscellaneous RxJs Operators
import {filter, map, shareReplay, catchError} from 'rxjs/operators'
import {Observable, Subject, throwError, defer} from 'rxjs'
import {Option} from 'fp-ts/lib/Option'
import {HttpErrorResponse} from '@angular/common/http'
import {HttpStatus} from 'http-status-codes'
/**
* Deserializes plain object to class instance.
*
* @param ClassType class to instantiate
* @returns stream with deserialization applied
*/
export const mapToClass = <T>(ClassType) => (source: Observable<T>): Observable<T> => source.pipe(
map(val => Object.assign(new ClassType(), val))
)
/**
* Filters defined values.
*
* @returns stream with only defined values
*/
export const filterUndefined = <T>() => (source: Observable<T>): Observable<T> => source.pipe(
filter(v => !!v)
)
/**
* Filters defined option values and unpacks them.
*
* @returns stream with options that contain value
*/
export const filterSome = <T>() => (source: Observable<Option<T>>): Observable<T> => source.pipe(
filter(o => o.isSome()),
map(o => o.toNullable())
)
/**
* Shares an underlying observable for use with multiple async-pipes on the same source.
*/
export const conflate = <T>() => (source: Observable<T>): Observable<T> => source.pipe(
shareReplay({
bufferSize: 1,
refCount: true
})
)
/**
* Invokes a callback upon subscription.
*
* @param callback function to invoke upon subscription
* @returns stream which will invoke callback
*/
export function prepare<T>(callback: () => void): (source: Observable<T>) => Observable<T> {
return (source: Observable<T>): Observable<T> => defer(() => {
callback();
return source;
});
}
/**
* Indicates whether the observable is currently loading (meaning subscription is active and
* it hasn't completed or errored).
*
* @param indicator subject as target for undication
* @returns stream which will indicate loading through passed subject
*/
export function indicate<T>(indicator: Subject<boolean>): (source: Observable<T>) => Observable<T> {
return (source: Observable<T>): Observable<T> => source.pipe(
prepare(() => indicator.next(true)),
finalize(() => indicator.next(false))
)
}
/**
* Maps Angular HTTP status codes to more semantic errors.
*
* @param codeErrors mapping from status code to error
* @returns stream which will map http error codes to passed errors
*/
export const throwForCodes = (codeErrors: Array<[HttpStatus, Error]>) => {
const mappedCodeErrors = new Map(codeErrors)
return <T>(source: Observable<T>) =>
source.pipe(catchError(error => {
if (error instanceof HttpErrorResponse) {
return throwError(mappedCodeErrors.get(error.status) || error)
}
return throwError(error)
}))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment