Skip to content

Instantly share code, notes, and snippets.

@okikio
Last active July 11, 2025 22:29
Show Gist options
  • Save okikio/93400203cc14a1a6a7dee18b13846fbe to your computer and use it in GitHub Desktop.
Save okikio/93400203cc14a1a6a7dee18b13846fbe to your computer and use it in GitHub Desktop.
// Operators
// Transforms a stream of data (T) into another stream (R).
// Why: Allows flexible data processing. Solves chaining operations.
export type Operator<T, R> = (stream: ReadableStream<T>) => ReadableStream<R>;
// Like Operator, but output excludes errors.
// Why: Ensures error-free streams. Solves safe data handling.
export type SafeOperator<T, R> = Operator<T, Exclude<R, ObservableError>>;
// Combines Operator and SafeOperator.
// Why: Supports mixed operators. Solves pipeline flexibility.
export type OperatorItem<T, R> = Operator<T, R> | SafeOperator<T, R>;
// Inference Types
// Figures out the source type (Observable or Operator).
// Why: Ensures correct input type. Solves type safety in pipelines.
export type InferSourceType<TSource extends unknown> =
TSource extends Observable<unknown> ? InferObservableType<TSource> :
TSource extends OperatorItem<unknown, unknown> ? InferOperatorItemOutputType<TSource> :
TSource;
// Gets the output type of an OperatorItem.
// Why: Tracks operator output. Solves pipeline type resolution.
export type InferOperatorItemOutputType<TSource extends OperatorItem<any, any>> =
TSource extends OperatorItem<infer _, infer T> ? T : never;
// Gets the data type an Observable emits.
// Why: Extracts Observable data type. Solves type-safe data access.
export type InferObservableType<TSource extends Observable<unknown>> =
TSource extends Observable<infer R> ? R : any;
// Utility Types
// Gets the first item of a tuple.
// Why: Accesses pipeline start. Solves type extraction.
export type FirstTupleItem<TTuple extends readonly [unknown, ...unknown[]]> = TTuple[0];
// Gets the last item of a tuple.
// Why: Finds final operator. Solves pipeline output typing.
export type GenericLastTupleItem<T extends readonly any[]> =
T extends [...infer _, infer L] ? L : never;
// Ensures last tuple item is an OperatorItem.
// Why: Validates pipeline end. Solves output type safety.
export type LastTupleItem<T extends readonly unknown[]> =
GenericLastTupleItem<T> extends OperatorItem<any, any> ? GenericLastTupleItem<T> : never;
// Pipeline final type
// Defines Observable output based on last operator.
// Why: Sets pipeline result type. Solves type-safe output.
export type ObservableWithPipe<
TPipe extends readonly [Observable<unknown>, ...OperatorItem<any, unknown>[]],
> = Observable<InferOperatorItemOutputType<LastTupleItem<TPipe>>>;
// Observable Types
// Custom error for Observables.
// Why: Standardizes error handling. Solves consistent error management.
export class ObservableError extends AggregateError { }
// Represents a stream of data.
// Why: Enables reactive data flow. Solves async streaming.
export class Observable<T> {
constructor() { }
}
// Pipe Overloads
// No operators: Returns source Observable.
// Why: Supports basic usage. Solves simple stream handling.
export function pipe<TSource extends Observable<unknown>>(source: TSource): Observable<InferObservableType<TSource>>;
// Overload 1: Single operator
export function pipe<
const TSource extends Observable<unknown>,
const TItem1 extends OperatorItem<any, unknown>,
>(
source: TSource,
op1: TItem1 | OperatorItem<
InferObservableType<TSource> | ObservableError,
InferOperatorItemOutputType<TItem1>
>,
): ObservableWithPipe<readonly [TSource, TItem1]>;
// Overload 2: Two operators
export function pipe<
const TSource extends Observable<unknown>,
const TItem1 extends OperatorItem<any, unknown>,
const TItem2 extends OperatorItem<any, unknown>,
>(
source: TSource,
op1: TItem1 | OperatorItem<
InferObservableType<TSource> | ObservableError,
InferOperatorItemOutputType<TItem1>
>,
op2: TItem2 | OperatorItem<InferOperatorItemOutputType<TItem1>, InferOperatorItemOutputType<TItem2>>,
): ObservableWithPipe<readonly [TSource, TItem1, TItem2]>;
// Overload 3: Three operators
export function pipe<
const TSource extends Observable<any>,
const TItem1 extends OperatorItem<any, unknown>,
const TItem2 extends OperatorItem<any, unknown>,
const TItem3 extends OperatorItem<any, unknown>,
>(
source: TSource,
op1: TItem1 | OperatorItem<
InferObservableType<TSource> | ObservableError,
InferOperatorItemOutputType<TItem1>
>,
op2: TItem2 | OperatorItem<InferOperatorItemOutputType<TItem1>, InferOperatorItemOutputType<TItem2>>,
op3: TItem3 | OperatorItem<InferOperatorItemOutputType<TItem2>, InferOperatorItemOutputType<TItem3>>,
): ObservableWithPipe<readonly [TSource, TItem1, TItem2, TItem3]>;
// Overload 4: Four operators
export function pipe<
const TSource extends Observable<unknown>,
const TItem1 extends OperatorItem<any, unknown>,
const TItem2 extends OperatorItem<any, unknown>,
const TItem3 extends OperatorItem<any, unknown>,
const TItem4 extends OperatorItem<any, unknown>,
>(
source: TSource,
op1: TItem1 | OperatorItem<
InferObservableType<TSource> | ObservableError,
InferOperatorItemOutputType<TItem1>
>,
op2: TItem2 | OperatorItem<InferOperatorItemOutputType<TItem1>, InferOperatorItemOutputType<TItem2>>,
op3: TItem3 | OperatorItem<InferOperatorItemOutputType<TItem2>, InferOperatorItemOutputType<TItem3>>,
op4: TItem4 | OperatorItem<InferOperatorItemOutputType<TItem3>, InferOperatorItemOutputType<TItem4>>,
): ObservableWithPipe<readonly [TSource, TItem1, TItem2, TItem3, TItem4]>;
// Pipe Implementation
// Executes pipeline with variable operators.
// Why: Enables dynamic pipelines. Solves flexible stream processing.
export function pipe<
TSource extends Observable<unknown>,
TOperators extends readonly OperatorItem<any, unknown>[]
>(source: TSource, ..._operators: TOperators): ObservableWithPipe<readonly [TSource, ...TOperators]> {
return new Observable() as ObservableWithPipe<readonly [TSource, ...TOperators]>;
}
// Ignore Error Operator
export function ignoreErrors<T>(): SafeOperator<T, T> {
return (source) => {
const transform = new TransformStream<T, Exclude<T, ObservableError>>({
transform(chunk, controller) {
if (!(chunk instanceof ObservableError)) {
controller.enqueue(chunk as Exclude<T, ObservableError>);
}
// Errors are silently dropped - like they never happened
}
});
return source.pipeThrough(transform);
}
}
// Simplified Operator Example
function sc<T, B>(op1: OperatorItem<T, B>): OperatorItem<T, B> {
return op1;
}
const source = new Observable<number>();
pipe(source,
// If ignoreErrors is commented out `src` become `number | ObservableError`
ignoreErrors(),
// ^?
sc((src) => src),
// ^?
sc((src) => src),
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment