Created
July 17, 2023 08:57
-
-
Save trvswgnr/b4cc0db3e28e3e794e7da27c02fc8f0b to your computer and use it in GitHub Desktop.
concurrent fetch with retry policy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export async function fetchAll(urls: string[], concurrency = 5, retryPolicy = defaultRetryPolicy): Promise<Response[]> { | |
const controller = new AbortController(); | |
const semaphore = new Semaphore(concurrency); | |
const promises = urls.map(async url => { | |
await semaphore.acquire(); | |
try { | |
return await fetchWithRetry(url, retryPolicy, controller.signal); | |
} catch (error) { | |
controller.abort(); // abort all requests on fatal error | |
throw error; | |
} finally { | |
semaphore.release(); | |
} | |
}); | |
return Promise.all(promises); | |
}; | |
async function fetchWithRetry(url: string, retryPolicy: RetryPolicy, signal: AbortSignal): Promise<Response> { | |
let attempt = 0; | |
const startTime = Date.now(); | |
while (true) { | |
try { | |
const response = await fetch(url, { signal }); | |
return response; | |
} catch (error) { | |
if (!(error instanceof Error)) throw new Error(`unexpected error when fetching ${url}`); | |
attempt++; | |
const elapsedTime = Date.now() - startTime; | |
if (!retryPolicy.shouldRetry({ attempt, elapsedTime, error })) throw error; | |
const delay = retryPolicy.calculateDelay({ attempt, elapsedTime, error }); | |
await new Promise(resolve => setTimeout(resolve, delay)); | |
} | |
} | |
} | |
const defaultRetryPolicy: RetryPolicy = { | |
shouldRetry({ elapsedTime, error }) { | |
return isNetworkError(error) && elapsedTime < 5000; | |
}, | |
calculateDelay({ attempt }) { | |
const delay = Math.pow(2, attempt) * 1000; // exponential backoff | |
const jitter = Math.random() * 1000; // jitter | |
return delay + jitter; | |
} | |
} | |
function isNetworkError(error: unknown): boolean { | |
return error instanceof TypeError || error instanceof DOMException | |
|| (error instanceof Error && error.name !== "AbortError" && error.name !== "TypeError") | |
} | |
class Semaphore { | |
private tasks: CallableFunction[] = []; | |
private count: number; | |
constructor(count: number) { | |
this.count = count; | |
} | |
async acquire() { | |
if (this.count > 0) { | |
this.count--; | |
return Promise.resolve(); | |
} else { | |
return new Promise(resolve => { | |
this.tasks.push(resolve); | |
}); | |
} | |
} | |
release() { | |
if (this.tasks.length > 0) { | |
const next = this.tasks.shift(); | |
if (typeof next === "function") { | |
next(); | |
} | |
return; | |
} | |
this.count++; | |
} | |
} | |
type RetryPolicyParams = { | |
elapsedTime: number; | |
error: Error; | |
attempt: number; | |
} | |
interface RetryPolicy { | |
shouldRetry(args: RetryPolicyParams): boolean; | |
calculateDelay(args: RetryPolicyParams): number; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment