Last active
February 17, 2020 02:53
-
-
Save nilsmehlhorn/f7ab3137939d5faff6ec9927d4c63455 to your computer and use it in GitHub Desktop.
Miscellaneous RxJs Operators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {filter, map, shareReplay, catchError} from 'rxjs/operators' | |
import {Observable, Subject, throwError, defer} from 'rxjs' | |
import {Option} from 'fp-ts/lib/Option' | |
import {HttpErrorResponse} from '@angular/common/http' | |
import {HttpStatus} from 'http-status-codes' | |
/** | |
* Deserializes plain object to class instance. | |
* | |
* @param ClassType class to instantiate | |
* @returns stream with deserialization applied | |
*/ | |
export const mapToClass = <T>(ClassType) => (source: Observable<T>): Observable<T> => source.pipe( | |
map(val => Object.assign(new ClassType(), val)) | |
) | |
/** | |
* Filters defined values. | |
* | |
* @returns stream with only defined values | |
*/ | |
export const filterUndefined = <T>() => (source: Observable<T>): Observable<T> => source.pipe( | |
filter(v => !!v) | |
) | |
/** | |
* Filters defined option values and unpacks them. | |
* | |
* @returns stream with options that contain value | |
*/ | |
export const filterSome = <T>() => (source: Observable<Option<T>>): Observable<T> => source.pipe( | |
filter(o => o.isSome()), | |
map(o => o.toNullable()) | |
) | |
/** | |
* Shares an underlying observable for use with multiple async-pipes on the same source. | |
*/ | |
export const conflate = <T>() => (source: Observable<T>): Observable<T> => source.pipe( | |
shareReplay({ | |
bufferSize: 1, | |
refCount: true | |
}) | |
) | |
/** | |
* Invokes a callback upon subscription. | |
* | |
* @param callback function to invoke upon subscription | |
* @returns stream which will invoke callback | |
*/ | |
export function prepare<T>(callback: () => void): (source: Observable<T>) => Observable<T> { | |
return (source: Observable<T>): Observable<T> => defer(() => { | |
callback(); | |
return source; | |
}); | |
} | |
/** | |
* Indicates whether the observable is currently loading (meaning subscription is active and | |
* it hasn't completed or errored). | |
* | |
* @param indicator subject as target for undication | |
* @returns stream which will indicate loading through passed subject | |
*/ | |
export function indicate<T>(indicator: Subject<boolean>): (source: Observable<T>) => Observable<T> { | |
return (source: Observable<T>): Observable<T> => source.pipe( | |
prepare(() => indicator.next(true)), | |
finalize(() => indicator.next(false)) | |
) | |
} | |
/** | |
* Maps Angular HTTP status codes to more semantic errors. | |
* | |
* @param codeErrors mapping from status code to error | |
* @returns stream which will map http error codes to passed errors | |
*/ | |
export const throwForCodes = (codeErrors: Array<[HttpStatus, Error]>) => { | |
const mappedCodeErrors = new Map(codeErrors) | |
return <T>(source: Observable<T>) => | |
source.pipe(catchError(error => { | |
if (error instanceof HttpErrorResponse) { | |
return throwError(mappedCodeErrors.get(error.status) || error) | |
} | |
return throwError(error) | |
})) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment