Skip to content

Instantly share code, notes, and snippets.

@Fasteroid
Last active September 19, 2024 20:59
Show Gist options
  • Save Fasteroid/8f7a55afe2a69d05fc8d08d9f9351e77 to your computer and use it in GitHub Desktop.
Save Fasteroid/8f7a55afe2a69d05fc8d08d9f9351e77 to your computer and use it in GitHub Desktop.
Hotwireable observables that remember.
import { MonoTypeOperatorFunction, Observable, Observer, OperatorFunction, Subject, Subscription, take, takeUntil } from "rxjs";
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef, inject } from "@angular/core";
const STUPID_DEV_WARNING: string = "While RxJS allows null subscriptions, I do not. Please remove this null subscription or fix it.";
/**
* A data conduit that auto-pushes its contents to new and existing subscribers once pressurized with {@linkcode Subject.next}.
*
* Might explode if connected circularly.
*/
export class Conduit<T> extends Subject<T> {
private _filled: boolean = false;
public get filled(): boolean { return this._filled; }
private _value: T | undefined = undefined;
public get value(): T | undefined { return this._value; }
private destroy: DestroyRef = inject(DestroyRef);
/**
* Creates a new Conduit.
*
* @param first an optional first value to pressurize the conduit with.
*/
constructor(first?: T) {
super();
super
.subscribe( snapshot => {
this._filled = true;
this._value = snapshot
}) // when it emits, save the snapshot
if(first !== undefined)
this.next(first); // if we have a first value, pressurize immediately
this.destroy.onDestroy( () => {
super.complete()
} );
}
/**
* Connects the provided callback to the output of this conduit.
*
* The callback will run immediately if this conduit is already pressurized.
* @param callback
* @returns subscription
*/
public override subscribe(callback: Partial<Observer<T>> | ((value: T) => void) | null | undefined): Subscription {
if( !callback ) throw new TypeError(STUPID_DEV_WARNING);
const observer = callback instanceof Function ? { next: callback } : callback;
if( !observer?.next ) throw new TypeError(STUPID_DEV_WARNING);
if(this._filled){ // we missed the last emit, so we need to catch up
observer.next(this._value!);
}
return super
.subscribe(observer);
}
/**
* Splices an output of something into this conduit's input.
*
* @note splicing a pre-pressurized conduit into this one will immediately pressurize this one, so you don't have to worry connection about order.
* @param other some other observable
*/
public splice(other: Observable<T>): void {
other
.pipe( takeUntilDestroyed(this.destroy) )
.subscribe( value => this.next(value) );
}
}
export type ReadonlyConduit<T> = Omit< Conduit<T>, 'next' | 'error' | 'complete' | 'splice' >;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment