Last active
February 3, 2020 16:17
-
-
Save mattpodwysocki/1c8fa56eaa1cf253c404354b32c08601 to your computer and use it in GitHub Desktop.
Adding basic cancellation to IxJS AsyncIterable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AsyncIterableX } from '../asynciterablex'; | |
import { OperatorAsyncFunction } from '../../interfaces'; | |
import { wrapWithAbort } from './withabort'; | |
export class ConcatAllAsyncIterable<TSource> extends AsyncIterableX<TSource> { | |
private _source: AsyncIterable<AsyncIterable<TSource>>; | |
private _signal?: AbortSignal; | |
constructor(source: AsyncIterable<AsyncIterable<TSource>>, signal?: AbortSignal) { | |
super(); | |
this._source = source; | |
this._signal = signal; | |
} | |
async *[Symbol.asyncIterator]() { | |
for await (let outer of wrapWithAbort(this._source, this._signal)) { | |
for await (let item of wrapWithAbort(outer, this._signal)) { | |
yield item; | |
} | |
} | |
} | |
} | |
export function concatAll<T>(signal?: AbortSignal): OperatorAsyncFunction<AsyncIterable<T>, T> { | |
return function concatAllOperatorFunction( | |
source: AsyncIterable<AsyncIterable<T>> | |
): AsyncIterableX<T> { | |
return new ConcatAllAsyncIterable<T>(source, signal); | |
}; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AbortError } from "../util/aborterror"; | |
export function delay(action: () => void, dueTime: number, signal?: AbortSignal) { | |
return new Promise((resolve, reject) => { | |
if (signal?.aborted) { | |
throw new AbortError(); | |
} | |
const id = setTimeout(() => { | |
if (signal?.aborted) { | |
throw new AbortError(); | |
} | |
try { | |
action(); | |
resolve(); | |
} catch (e) { | |
reject(e); | |
} | |
}, dueTime); | |
if (signal) { | |
signal.onabort = () => { | |
clearTimeout(id); | |
reject(new AbortError()); | |
}; | |
} | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AbortError } from 'ix/util/aborterror'; | |
export function sleep(dueTime: number, signal?: AbortSignal) { | |
return new Promise<void>((resolve, reject) => { | |
if (signal?.aborted) { | |
reject(new AbortError()); | |
} | |
const id = setTimeout(() => { | |
if (signal?.aborted) { | |
reject(new AbortError()); | |
} | |
resolve(); | |
}, dueTime); | |
if (signal) { | |
signal.onabort = () => { | |
clearTimeout(id); | |
reject(new AbortError()); | |
}; | |
} | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AsyncIterableX } from '../asynciterablex'; | |
import { MonoTypeOperatorAsyncFunction } from '../../interfaces'; | |
import { AbortError } from '../../util/aborterror'; | |
export class WithAbortAsyncIterable<TSource> extends AsyncIterableX<TSource> { | |
private _source: AsyncIterable<TSource>; | |
private _signal: AbortSignal; | |
constructor(source: AsyncIterable<TSource>, signal: AbortSignal) { | |
super(); | |
this._source = source; | |
this._signal = signal; | |
} | |
async *[Symbol.asyncIterator](): AsyncIterator<TSource, any, undefined> { | |
if (this._signal.aborted) { | |
throw new AbortError(); | |
} | |
for await (let item of this._source) { | |
if (this._signal.aborted) { | |
throw new AbortError(); | |
} | |
yield item; | |
} | |
} | |
} | |
export function wrapWithAbort<TSource>(source: AsyncIterable<TSource>, signal?: AbortSignal): AsyncIterable<TSource> { | |
return signal ? | |
new WithAbortAsyncIterable(source, signal) : | |
source; | |
} | |
export function withAbort<TSource>(signal: AbortSignal): MonoTypeOperatorAsyncFunction<TSource> { | |
return function withAbortOperatorFunction(source: AsyncIterable<TSource>): AsyncIterableX<TSource> { | |
return new WithAbortAsyncIterable<TSource>(source, signal); | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
👍
I think it needs to use
addEventListener()
with{ once: true }
though so it doesn't override another handled and then be removed withremoveEventListener()
. This is also why it sucks thatAbortSignal
is just anEventTarget
- the listener does not even get cleaned up even if after the signal was cancelled and you need so much boilerplate for proper cleanup. Good example is here: https://github.com/sindresorhus/delay/blob/master/index.js