Skip to content

Instantly share code, notes, and snippets.

@joost-de-vries
Last active August 21, 2023 13:07
Show Gist options
  • Save joost-de-vries/5cb8efdb532fd2b65bbf9468b573ebe7 to your computer and use it in GitHub Desktop.
Save joost-de-vries/5cb8efdb532fd2b65bbf9468b573ebe7 to your computer and use it in GitHub Desktop.
Batched concurrent mapping & stream handling in Typescript
/*
This builds on Promise.allSettled.
Adding:
- max parallelism. Handling a nr of calls concurrently, than the next, than the next
- make clear which input arguments gave which results and which failures
- support for infinite streams. Using async iterators / asyc generators
*/
export interface MapConcurrentOpts<A> {
args: A[]
parallelism?: number
}
/** map the args with fn by parallelism at a time. Ignore failures */
export async function mapConcurrentlyOrIgnore<A, B>(traverseOpts: MapConcurrentOpts<A>, fn: (a: A) => Promise<B>): Promise<MapConcurrentResult<A, B>[]> {
const { values } = await mapConcurrently(traverseOpts, fn)
return values
}
/** map the args with fn by parallelism at a time. */
export async function mapConcurrently<A, B>(traverseOpts: MapConcurrentOpts<A>, fn: (a: A) => Promise<B>): Promise<MapConcurrentlyResults<A, B>> {
const out: MapConcurrentResult<A, B>[] = []
const errors: MapConcurrentFailureResult<A>[] = []
for await (const batchOfResults of streamMapConcurrently(traverseOpts, fn)) {
out.push(...batchOfResults.values)
errors.push(...batchOfResults.errors)
}
return {
values: out,
errors: errors,
}
}
export function toResults<A, B>(allSettled: PromiseSettledResult<Awaited<MapConcurrentResult<A, B>>>[]): MapConcurrentlyResults<A, B> {
const out: MapConcurrentResult<A, B>[] = []
const errors: MapConcurrentFailureResult<A>[] = []
for (const result of allSettled) {
switch (result.status) {
case "fulfilled":
out.push(result.value)
break
case "rejected": {
const reason = result.reason as MapConcurrentError<A>
errors.push({ arg: reason.arg, failure: reason })
break
}
}
}
return {
values: out,
errors: errors,
}
}
export interface MapConcurrentlyResults<A, B> {
values: MapConcurrentResult<A, B>[]
errors: MapConcurrentFailureResult<A>[]
}
/**
* Results in a stream (async iterator) of results. Handled parrallelism at a time.
* Use if handling an infinite stream or if having all results in memory is infeasible. Otherwise use mapConcurrently
*/
export async function* streamMapConcurrently<A, B>({ args, parallelism }: MapConcurrentOpts<A>, fn: (a: A) => Promise<B>): AsyncGenerator<MapConcurrentlyResults<A, B>, void, unknown> {
const batched = [...streamOfArrays(args, parallelism ?? 10)]
let i = 0
async function toTraverseResult(arg: A): Promise<MapConcurrentResult<A, B>> {
try {
const b = await fn(arg)
return {
arg: arg,
result: b,
}
} catch (error) {
throw new MapConcurrentError(arg, error)
}
}
for (const batch of batched) {
const batchResults = await Promise.allSettled(batch.map(toTraverseResult))
if (batch.length > 0) {
log.debug(`Received results for batch #${i++}`)
}
yield toResults(batchResults)
}
}
export interface MapConcurrentResult<A, B> {
arg: A
result: B
}
export interface MapConcurrentFailureResult<A> {
arg: A
failure: MapConcurrentError<A>
}
export class MapConcurrentError<A> extends Error {
readonly arg: A
constructor(arg: A, cause: unknown) {
super(`Error for input ${JSON.stringify(arg)} ${MapConcurrentError.causeToString(cause)}`, { cause: cause })
this.arg = arg
}
private static causeToString(cause?: unknown): string {
if (cause) {
if (cause instanceof Error) {
return cause.message
} else if (typeof cause === "string") {
return cause
} else {
return JSON.stringify(cause)
}
} else {
return ""
}
}
}
function* streamOfArrays<A>(arr: A[], size: number): Generator<A[]> {
if (arr.length > size) {
yield arr.slice(0, size)
yield* streamOfArrays(arr.slice(size), size)
} else {
yield arr
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment