Last active
July 11, 2025 22:29
-
-
Save okikio/93400203cc14a1a6a7dee18b13846fbe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Operators | |
// Transforms a stream of data (T) into another stream (R). | |
// Why: Allows flexible data processing. Solves chaining operations. | |
export type Operator<T, R> = (stream: ReadableStream<T>) => ReadableStream<R>; | |
// Like Operator, but output excludes errors. | |
// Why: Ensures error-free streams. Solves safe data handling. | |
export type SafeOperator<T, R> = Operator<T, Exclude<R, ObservableError>>; | |
// Combines Operator and SafeOperator. | |
// Why: Supports mixed operators. Solves pipeline flexibility. | |
export type OperatorItem<T, R> = Operator<T, R> | SafeOperator<T, R>; | |
// Inference Types | |
// Figures out the source type (Observable or Operator). | |
// Why: Ensures correct input type. Solves type safety in pipelines. | |
export type InferSourceType<TSource extends unknown> = | |
TSource extends Observable<unknown> ? InferObservableType<TSource> : | |
TSource extends OperatorItem<unknown, unknown> ? InferOperatorItemOutputType<TSource> : | |
TSource; | |
// Gets the output type of an OperatorItem. | |
// Why: Tracks operator output. Solves pipeline type resolution. | |
export type InferOperatorItemOutputType<TSource extends OperatorItem<any, any>> = | |
TSource extends OperatorItem<infer _, infer T> ? T : never; | |
// Gets the data type an Observable emits. | |
// Why: Extracts Observable data type. Solves type-safe data access. | |
export type InferObservableType<TSource extends Observable<unknown>> = | |
TSource extends Observable<infer R> ? R : any; | |
// Utility Types | |
// Gets the first item of a tuple. | |
// Why: Accesses pipeline start. Solves type extraction. | |
export type FirstTupleItem<TTuple extends readonly [unknown, ...unknown[]]> = TTuple[0]; | |
// Gets the last item of a tuple. | |
// Why: Finds final operator. Solves pipeline output typing. | |
export type GenericLastTupleItem<T extends readonly any[]> = | |
T extends [...infer _, infer L] ? L : never; | |
// Ensures last tuple item is an OperatorItem. | |
// Why: Validates pipeline end. Solves output type safety. | |
export type LastTupleItem<T extends readonly unknown[]> = | |
GenericLastTupleItem<T> extends OperatorItem<any, any> ? GenericLastTupleItem<T> : never; | |
// Pipeline final type | |
// Defines Observable output based on last operator. | |
// Why: Sets pipeline result type. Solves type-safe output. | |
export type ObservableWithPipe< | |
TPipe extends readonly [Observable<unknown>, ...OperatorItem<any, unknown>[]], | |
> = Observable<InferOperatorItemOutputType<LastTupleItem<TPipe>>>; | |
// Observable Types | |
// Custom error for Observables. | |
// Why: Standardizes error handling. Solves consistent error management. | |
export class ObservableError extends AggregateError { } | |
// Represents a stream of data. | |
// Why: Enables reactive data flow. Solves async streaming. | |
export class Observable<T> { | |
constructor() { } | |
} | |
// Pipe Overloads | |
// No operators: Returns source Observable. | |
// Why: Supports basic usage. Solves simple stream handling. | |
export function pipe<TSource extends Observable<unknown>>(source: TSource): Observable<InferObservableType<TSource>>; | |
// Overload 1: Single operator | |
export function pipe< | |
const TSource extends Observable<unknown>, | |
const TItem1 extends OperatorItem<any, unknown>, | |
>( | |
source: TSource, | |
op1: TItem1 | OperatorItem< | |
InferObservableType<TSource> | ObservableError, | |
InferOperatorItemOutputType<TItem1> | |
>, | |
): ObservableWithPipe<readonly [TSource, TItem1]>; | |
// Overload 2: Two operators | |
export function pipe< | |
const TSource extends Observable<unknown>, | |
const TItem1 extends OperatorItem<any, unknown>, | |
const TItem2 extends OperatorItem<any, unknown>, | |
>( | |
source: TSource, | |
op1: TItem1 | OperatorItem< | |
InferObservableType<TSource> | ObservableError, | |
InferOperatorItemOutputType<TItem1> | |
>, | |
op2: TItem2 | OperatorItem<InferOperatorItemOutputType<TItem1>, InferOperatorItemOutputType<TItem2>>, | |
): ObservableWithPipe<readonly [TSource, TItem1, TItem2]>; | |
// Overload 3: Three operators | |
export function pipe< | |
const TSource extends Observable<any>, | |
const TItem1 extends OperatorItem<any, unknown>, | |
const TItem2 extends OperatorItem<any, unknown>, | |
const TItem3 extends OperatorItem<any, unknown>, | |
>( | |
source: TSource, | |
op1: TItem1 | OperatorItem< | |
InferObservableType<TSource> | ObservableError, | |
InferOperatorItemOutputType<TItem1> | |
>, | |
op2: TItem2 | OperatorItem<InferOperatorItemOutputType<TItem1>, InferOperatorItemOutputType<TItem2>>, | |
op3: TItem3 | OperatorItem<InferOperatorItemOutputType<TItem2>, InferOperatorItemOutputType<TItem3>>, | |
): ObservableWithPipe<readonly [TSource, TItem1, TItem2, TItem3]>; | |
// Overload 4: Four operators | |
export function pipe< | |
const TSource extends Observable<unknown>, | |
const TItem1 extends OperatorItem<any, unknown>, | |
const TItem2 extends OperatorItem<any, unknown>, | |
const TItem3 extends OperatorItem<any, unknown>, | |
const TItem4 extends OperatorItem<any, unknown>, | |
>( | |
source: TSource, | |
op1: TItem1 | OperatorItem< | |
InferObservableType<TSource> | ObservableError, | |
InferOperatorItemOutputType<TItem1> | |
>, | |
op2: TItem2 | OperatorItem<InferOperatorItemOutputType<TItem1>, InferOperatorItemOutputType<TItem2>>, | |
op3: TItem3 | OperatorItem<InferOperatorItemOutputType<TItem2>, InferOperatorItemOutputType<TItem3>>, | |
op4: TItem4 | OperatorItem<InferOperatorItemOutputType<TItem3>, InferOperatorItemOutputType<TItem4>>, | |
): ObservableWithPipe<readonly [TSource, TItem1, TItem2, TItem3, TItem4]>; | |
// Pipe Implementation | |
// Executes pipeline with variable operators. | |
// Why: Enables dynamic pipelines. Solves flexible stream processing. | |
export function pipe< | |
TSource extends Observable<unknown>, | |
TOperators extends readonly OperatorItem<any, unknown>[] | |
>(source: TSource, ..._operators: TOperators): ObservableWithPipe<readonly [TSource, ...TOperators]> { | |
return new Observable() as ObservableWithPipe<readonly [TSource, ...TOperators]>; | |
} | |
// Ignore Error Operator | |
export function ignoreErrors<T>(): SafeOperator<T, T> { | |
return (source) => { | |
const transform = new TransformStream<T, Exclude<T, ObservableError>>({ | |
transform(chunk, controller) { | |
if (!(chunk instanceof ObservableError)) { | |
controller.enqueue(chunk as Exclude<T, ObservableError>); | |
} | |
// Errors are silently dropped - like they never happened | |
} | |
}); | |
return source.pipeThrough(transform); | |
} | |
} | |
// Simplified Operator Example | |
function sc<T, B>(op1: OperatorItem<T, B>): OperatorItem<T, B> { | |
return op1; | |
} | |
const source = new Observable<number>(); | |
pipe(source, | |
// If ignoreErrors is commented out `src` become `number | ObservableError` | |
ignoreErrors(), | |
// ^? | |
sc((src) => src), | |
// ^? | |
sc((src) => src), | |
); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment