Created
July 17, 2023 09:34
-
-
Save trvswgnr/f264b577dee3a6858340337e0b246e70 to your computer and use it in GitHub Desktop.
generic concurrent execution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export async function runConcurrentAsync<T, A extends IterableIterator<unknown>>(tasks: Task<T>[], argsList: A[] = [], concurrency = 5): Promise<T[]> { | |
const semaphore = new Semaphore(concurrency); | |
const promises = tasks.map(async (task, index) => { | |
await semaphore.acquire(); | |
try { | |
const args = argsList[index] || []; | |
return await task(...args); | |
} finally { | |
semaphore.release(); | |
} | |
}); | |
return Promise.all(promises); | |
}; | |
export async function fetchAll(urls: string[], retryPolicy = defaultRetryPolicy): Promise<Response[]> { | |
const controller = new AbortController(); | |
const tasks = urls.map(url => async () => { | |
try { | |
return await fetchWithRetry(url, retryPolicy, controller.signal); | |
} catch (error) { | |
controller.abort(); // abort all requests on fatal error | |
throw error; | |
} | |
}); | |
return runConcurrentAsync(tasks); | |
}; | |
async function fetchWithRetry(url: string, retryPolicy: RetryPolicy, signal: AbortSignal): Promise<Response> { | |
let attempt = 0; | |
const startTime = Date.now(); | |
while (true) { | |
try { | |
const response = await fetch(url, { signal }); | |
return response; | |
} catch (error) { | |
if (!(error instanceof Error)) throw new Error(`unexpected error when fetching ${url}`); | |
attempt++; | |
const elapsedTime = Date.now() - startTime; | |
if (!retryPolicy.shouldRetry({ attempt, elapsedTime, error })) throw error; | |
const delay = retryPolicy.calculateDelay({ attempt, elapsedTime, error }); | |
await new Promise(resolve => setTimeout(resolve, delay)); | |
} | |
} | |
} | |
const defaultRetryPolicy: RetryPolicy = { | |
shouldRetry({ elapsedTime, error }) { | |
return isNetworkError(error) && elapsedTime < 5000; | |
}, | |
calculateDelay({ attempt }) { | |
const delay = Math.pow(2, attempt) * 1000; // exponential backoff | |
const jitter = Math.random() * 1000; // jitter | |
return delay + jitter; | |
} | |
} | |
function isNetworkError(error: unknown): boolean { | |
return error instanceof TypeError || error instanceof DOMException | |
|| (error instanceof Error && error.name !== "AbortError" && error.name !== "TypeError") | |
} | |
class Semaphore { | |
private tasks: CallableFunction[] = []; | |
private count: number; | |
constructor(count: number) { | |
this.count = count; | |
} | |
async acquire() { | |
if (this.count > 0) { | |
this.count--; | |
return Promise.resolve(); | |
} else { | |
return new Promise(resolve => { | |
this.tasks.push(resolve); | |
}); | |
} | |
} | |
release() { | |
if (this.tasks.length > 0) { | |
const next = this.tasks.shift(); | |
if (typeof next === "function") { | |
next(); | |
} | |
return; | |
} | |
this.count++; | |
} | |
} | |
type Task<T> = (...args: unknown[]) => Promise<T>; | |
type RetryPolicyParams = { | |
elapsedTime: number; | |
error: Error; | |
attempt: number; | |
} | |
interface RetryPolicy { | |
shouldRetry(args: RetryPolicyParams): boolean; | |
calculateDelay(args: RetryPolicyParams): number; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment