Created
March 27, 2022 21:11
-
-
Save imhoffd/379bce434c4bf5c562e62be92e50f0c9 to your computer and use it in GitHub Desktop.
AsyncIterableSubject
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import AsyncIterableSubject from '../AsyncIterableSubject' | |
let subject: AsyncIterableSubject<number> | |
const iterateAndReturnFirst = async <T>( | |
subject: AsyncIterableSubject<T>, | |
): Promise<IteratorResult<T>> => { | |
const iterator = subject[Symbol.asyncIterator]() | |
return iterator.next() | |
} | |
beforeEach(() => { | |
subject = new AsyncIterableSubject() | |
}) | |
it('should not iterate after completing', async () => { | |
setImmediate(() => { | |
subject.complete() | |
}) | |
const result = await iterateAndReturnFirst(subject) | |
expect(result.done).toBe(true) | |
}) | |
it('should not iterate if completed', async () => { | |
subject.complete() | |
const result = await iterateAndReturnFirst(subject) | |
expect(result.done).toBe(true) | |
}) | |
it('should not iterate if complete() is called multiple times', async () => { | |
for (const _ of [1, 2, 3]) { | |
subject.complete() | |
} | |
const result = await iterateAndReturnFirst(subject) | |
expect(result.done).toBe(true) | |
}) | |
it('should throw an error if sending value after complete', async () => { | |
subject.complete() | |
expect(() => subject.next(1)).toThrowError(/after completion/) | |
}) | |
it('should throw an error if sending value before iterating', async () => { | |
expect(() => subject.next(1)).toThrowError(/before iterating/) | |
}) | |
it('should iterate once with a value before completing', async () => { | |
expect.hasAssertions() | |
setImmediate(() => { | |
subject.next(1) | |
subject.complete() | |
}) | |
for await (const value of subject) { | |
expect(value).toBe(1) | |
} | |
}) | |
it('should iterate once with a value', async () => { | |
expect.hasAssertions() | |
setImmediate(() => { | |
subject.next(1) | |
}) | |
for await (const value of subject) { | |
expect(value).toBe(1) | |
break | |
} | |
}) | |
it('should allow values sent synchronously', async () => { | |
expect.hasAssertions() | |
setImmediate(() => { | |
subject.next(1) | |
subject.next(2) | |
subject.next(3) | |
}) | |
let expected = 1 | |
for await (const value of subject) { | |
expect(value).toBe(expected) | |
if (expected === 3) { | |
break | |
} | |
expected++ | |
} | |
}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Deferred from './Deferred' | |
export default class AsyncIterableSubject<T> implements AsyncIterable<T> { | |
private _signal: Deferred<void> | null = new Deferred() | |
private _queue: T[] = [] | |
next(value: T) { | |
if (!this._signal) { | |
throw new Error('Cannot call next() after completion') | |
} | |
if (!this._signal.awaited) { | |
throw new Error('Cannot call next() before iterating') | |
} | |
this._queue.push(value) | |
this._signal.resolve() | |
} | |
complete() { | |
if (this._signal?.awaited) { | |
this._signal.reject() | |
} | |
this._signal = null | |
} | |
[Symbol.asyncIterator](): AsyncIterator<T, null> { | |
return { | |
next: async () => { | |
try { | |
if (!this._signal) { | |
return { done: true, value: null } | |
} | |
// delay next iteration until we get a value | |
await this._signal.promise | |
const value = this._queue.shift() as T | |
// | |
if (this._signal && this._queue.length === 0) { | |
this._signal = new Deferred() | |
} | |
return { done: false, value } | |
} catch { | |
return { done: true, value: null } | |
} | |
}, | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export type Resolve<T> = (value: T | PromiseLike<T>) => void | |
export type Reject = (reason?: any) => void | |
export default class Deferred<T> { | |
awaited = false | |
fulfilled = false | |
rejected = false | |
resolve!: Resolve<T> | |
reject!: Reject | |
private _promise: Promise<T> = new Promise( | |
(resolve: Resolve<T>, reject: Reject) => { | |
this.resolve = value => { | |
this.fulfilled = true | |
resolve(value) | |
} | |
this.reject = reason => { | |
this.rejected = true | |
reject(reason) | |
} | |
}, | |
) | |
get promise() { | |
this.awaited = true | |
return this._promise | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment