Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Last active February 3, 2020 16:17
Show Gist options
  • Save mattpodwysocki/1c8fa56eaa1cf253c404354b32c08601 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/1c8fa56eaa1cf253c404354b32c08601 to your computer and use it in GitHub Desktop.
Adding basic cancellation to IxJS AsyncIterable
import { AsyncIterableX } from '../asynciterablex';
import { OperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';
export class ConcatAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<AsyncIterable<TSource>>;
private _signal?: AbortSignal;
constructor(source: AsyncIterable<AsyncIterable<TSource>>, signal?: AbortSignal) {
super();
this._source = source;
this._signal = signal;
}
async *[Symbol.asyncIterator]() {
for await (let outer of wrapWithAbort(this._source, this._signal)) {
for await (let item of wrapWithAbort(outer, this._signal)) {
yield item;
}
}
}
}
export function concatAll<T>(signal?: AbortSignal): OperatorAsyncFunction<AsyncIterable<T>, T> {
return function concatAllOperatorFunction(
source: AsyncIterable<AsyncIterable<T>>
): AsyncIterableX<T> {
return new ConcatAllAsyncIterable<T>(source, signal);
};
}
import { AbortError } from "../util/aborterror";
export function delay(action: () => void, dueTime: number, signal?: AbortSignal) {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
throw new AbortError();
}
const id = setTimeout(() => {
if (signal?.aborted) {
throw new AbortError();
}
try {
action();
resolve();
} catch (e) {
reject(e);
}
}, dueTime);
if (signal) {
signal.onabort = () => {
clearTimeout(id);
reject(new AbortError());
};
}
});
}
import { AbortError } from 'ix/util/aborterror';
export function sleep(dueTime: number, signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
if (signal?.aborted) {
reject(new AbortError());
}
const id = setTimeout(() => {
if (signal?.aborted) {
reject(new AbortError());
}
resolve();
}, dueTime);
if (signal) {
signal.onabort = () => {
clearTimeout(id);
reject(new AbortError());
};
}
});
}
import { AsyncIterableX } from '../asynciterablex';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces';
import { AbortError } from '../../util/aborterror';
export class WithAbortAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
private _signal: AbortSignal;
constructor(source: AsyncIterable<TSource>, signal: AbortSignal) {
super();
this._source = source;
this._signal = signal;
}
async *[Symbol.asyncIterator](): AsyncIterator<TSource, any, undefined> {
if (this._signal.aborted) {
throw new AbortError();
}
for await (let item of this._source) {
if (this._signal.aborted) {
throw new AbortError();
}
yield item;
}
}
}
export function wrapWithAbort<TSource>(source: AsyncIterable<TSource>, signal?: AbortSignal): AsyncIterable<TSource> {
return signal ?
new WithAbortAsyncIterable(source, signal) :
source;
}
export function withAbort<TSource>(signal: AbortSignal): MonoTypeOperatorAsyncFunction<TSource> {
return function withAbortOperatorFunction(source: AsyncIterable<TSource>): AsyncIterableX<TSource> {
return new WithAbortAsyncIterable<TSource>(source, signal);
};
}
@felixfbecker
Copy link

👍
I think it needs to use addEventListener() with { once: true } though so it doesn't override another handled and then be removed with removeEventListener(). This is also why it sucks that AbortSignal is just an EventTarget - the listener does not even get cleaned up even if after the signal was cancelled and you need so much boilerplate for proper cleanup. Good example is here: https://github.com/sindresorhus/delay/blob/master/index.js

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment