Last active
September 19, 2024 20:59
-
-
Save Fasteroid/8f7a55afe2a69d05fc8d08d9f9351e77 to your computer and use it in GitHub Desktop.
Hotwireable observables that remember.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { MonoTypeOperatorFunction, Observable, Observer, OperatorFunction, Subject, Subscription, take, takeUntil } from "rxjs"; | |
import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; | |
import { DestroyRef, inject } from "@angular/core"; | |
const STUPID_DEV_WARNING: string = "While RxJS allows null subscriptions, I do not. Please remove this null subscription or fix it."; | |
/** | |
* A data conduit that auto-pushes its contents to new and existing subscribers once pressurized with {@linkcode Subject.next}. | |
* | |
* Might explode if connected circularly. | |
*/ | |
export class Conduit<T> extends Subject<T> { | |
private _filled: boolean = false; | |
public get filled(): boolean { return this._filled; } | |
private _value: T | undefined = undefined; | |
public get value(): T | undefined { return this._value; } | |
private destroy: DestroyRef = inject(DestroyRef); | |
/** | |
* Creates a new Conduit. | |
* | |
* @param first an optional first value to pressurize the conduit with. | |
*/ | |
constructor(first?: T) { | |
super(); | |
super | |
.subscribe( snapshot => { | |
this._filled = true; | |
this._value = snapshot | |
}) // when it emits, save the snapshot | |
if(first !== undefined) | |
this.next(first); // if we have a first value, pressurize immediately | |
this.destroy.onDestroy( () => { | |
super.complete() | |
} ); | |
} | |
/** | |
* Connects the provided callback to the output of this conduit. | |
* | |
* The callback will run immediately if this conduit is already pressurized. | |
* @param callback | |
* @returns subscription | |
*/ | |
public override subscribe(callback: Partial<Observer<T>> | ((value: T) => void) | null | undefined): Subscription { | |
if( !callback ) throw new TypeError(STUPID_DEV_WARNING); | |
const observer = callback instanceof Function ? { next: callback } : callback; | |
if( !observer?.next ) throw new TypeError(STUPID_DEV_WARNING); | |
if(this._filled){ // we missed the last emit, so we need to catch up | |
observer.next(this._value!); | |
} | |
return super | |
.subscribe(observer); | |
} | |
/** | |
* Splices an output of something into this conduit's input. | |
* | |
* @note splicing a pre-pressurized conduit into this one will immediately pressurize this one, so you don't have to worry connection about order. | |
* @param other some other observable | |
*/ | |
public splice(other: Observable<T>): void { | |
other | |
.pipe( takeUntilDestroyed(this.destroy) ) | |
.subscribe( value => this.next(value) ); | |
} | |
} | |
export type ReadonlyConduit<T> = Omit< Conduit<T>, 'next' | 'error' | 'complete' | 'splice' >; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment