Created
August 14, 2023 04:31
-
-
Save rkok/67e2d59c6bfbee85df88d0d76344dc66 to your computer and use it in GitHub Desktop.
Rate-limit-aware, multi-interface wrapper for Axios (HTTPS requests only)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Disclaimer: this was slung together as the result of some trial and error and can definitely be improved. | |
* | |
* Pass environment variable DEBUG=1 for lots of debug logging. | |
* | |
* Usage example: | |
* | |
* const rateLimits: RateLimitDef[] = [ | |
* { | |
* urlRegex: 'carriages/[^/]+/horses', | |
* reqsPerMinute: 10 | |
* } | |
* ]; | |
* | |
* const client = new MultiHttps({ | |
* rateLimits, | |
* interfaces: ['1.2.3.4', '2.3.4.5'] | |
* }); | |
* | |
* await Promise.all([1,2,3,4].map(n => client.enqueue({ url: `https://example.com/carriages/${n}/horses` }))); | |
* | |
*/ | |
import axios, {AxiosResponse, AxiosRequestConfig, AxiosInstance} from "axios"; | |
import chalk from "chalk"; | |
import moment from "moment"; | |
import {v4 as uuidv4} from 'uuid'; | |
import logger from "../logger"; | |
const https = require('https'); | |
// Source: https://github.com/axios/axios/issues/2808#issuecomment-800141018 | |
const getWrappedHttpsTransport = ({localAddress}: { localAddress: string }): any => ({ | |
...https, | |
request: (options: any, callback: any) => https.request({ | |
...options, | |
...(localAddress && { | |
localAddress, | |
family: localAddress.includes(':') ? 6 : 4, | |
}), | |
}, callback), | |
}); | |
export type RateLimitDef = { | |
urlRegex: string, | |
reqsPerMinute: number, | |
}; | |
type Thread = { | |
interface: string, | |
client: AxiosInstance, | |
status: 'backoff' | 'running' | 'waiting' | 'free', | |
nextAvailableTime: number | |
} | |
type RequestConfig<D> = AxiosRequestConfig<D> & { | |
backoffSecs?: number, | |
retries?: number | |
} | |
const threads: { [Key in string]: Thread[] } = {}; | |
/** Rate-limit-aware, multi-interface wrapper for Axios (HTTPS requests only) */ | |
class MultiHttps { | |
// Keep interfaces for debugging purposes only | |
instanceId: string; | |
rateLimits: RateLimitDef[] = []; | |
queue: ((threadIndex: number) => Promise<any>)[] = []; | |
queueSemaphore = require('semaphore')(1); | |
constructor({rateLimits, interfaces = ['default']}: { | |
rateLimits: RateLimitDef[], | |
interfaces: string[] | |
}) { | |
this.instanceId = uuidv4(); | |
this.rateLimits = rateLimits; | |
threads[this.instanceId] = interfaces.map(localAddress => { | |
const clientOpts = localAddress === 'default' ? {} : { | |
// @ts-ignore See https://github.com/axios/axios/issues/5431#issue-1520478553 | |
transport: getWrappedHttpsTransport({localAddress}) | |
} as AxiosRequestConfig; | |
return { | |
interface: localAddress, | |
client: axios.create(clientOpts), | |
status: 'free', | |
nextAvailableTime: 0 | |
} | |
}); | |
this.watchQueue(); | |
} | |
async watchQueue() { | |
while (true) { | |
await new Promise((resolve) => { | |
this.queueSemaphore.take(() => { | |
const threadIndex = this.getNextAvailableThreadIndex(); | |
if (threadIndex !== undefined) { | |
const callback = this.queue.shift(); | |
if (callback) { | |
if (process.env.DEBUG) console.debug(Date.now(), `found free thread ${threadIndex}, pushing next job`); | |
callback(threadIndex) | |
.catch(e => { /* Do nothing; outside listeners will handle it */ | |
}); | |
} | |
} | |
this.queueSemaphore.leave(); | |
setTimeout(resolve, 5); | |
}); | |
}); | |
} | |
} | |
async enqueue<T = any, R = AxiosResponse<T>, D = any>(config: RequestConfig<D>): Promise<R> { | |
if (process.env.DEBUG) console.debug(Date.now(), `enqueueing`); | |
return new Promise((resolve, reject) => { | |
this.queue.push((threadIdx) => { | |
return this.request<T, R, D>(config, threadIdx) | |
.then(resolve) | |
.catch(e => reject(e)); | |
}); | |
}); | |
} | |
async request<T = any, R = AxiosResponse<T>, D = any>(config: AxiosRequestConfig<D> & { | |
backoffSecs?: number, | |
retries?: number | |
}, threadIndex: number): Promise<R> { | |
threads[this.instanceId][threadIndex].status = 'running'; | |
const thread = threads[this.instanceId][threadIndex]; | |
const backoffSecs = config?.backoffSecs ?? 60; | |
const retries = config?.retries ?? 3; | |
const now = Date.now(); | |
const nextTime = thread.nextAvailableTime; | |
const url = (config.baseURL ?? '') + (config?.url ?? ''); | |
const distancingMs = this.getMinMsBetweenRequests(url); | |
const waitMs = (now < nextTime) ? nextTime - now : 0; | |
if (process.env.DEBUG) { | |
this.dumpStatus(threadIndex, url); | |
console.debug({penaltyMs: distancingMs, now, nextTime, waitMs}) | |
} | |
threads[this.instanceId][threadIndex].nextAvailableTime = Math.max(nextTime, now + waitMs) + distancingMs; | |
const addr = this.getInterfaceAddrPretty(threadIndex); | |
const run = async <R>() => { | |
if (process.env.DEBUG) console.debug(Date.now(), `${url} starting`); | |
return new Promise<R>(async (resolve, reject) => { | |
let resolved = false; | |
for (let attempt = 1; attempt <= retries && !resolved; attempt++) { | |
let wait = false; | |
await thread.client.request<T, R, D>(config) | |
.then((res) => { | |
resolved = true; | |
return resolve(res) | |
}) | |
.catch(async (e: any) => { | |
if (process.env.DEBUG) console.debug(Date.now(), `caught err ${e.message}`); | |
if (axios.isAxiosError(e)) { | |
const attemptPrefix = `[Attempt ${attempt}/${retries}] ${addr}`; | |
if (e.response?.status === 429) { | |
logger.warn(`${attemptPrefix} Request throttled by API, backing off for ${backoffSecs}s`, url); | |
wait = true; | |
} else if (e.code === 'ECONNABORTED' && e.message.includes('timeout')) { | |
logger.warn(`${attemptPrefix} Request timed out, retrying`); | |
} else { | |
return reject(e); | |
} | |
} else { | |
return reject(e); | |
} | |
}) | |
.finally(() => { | |
if (process.env.DEBUG) console.debug(Date.now(), `${url} response received`); | |
}); | |
if (wait) { | |
threads[this.instanceId][threadIndex].status = 'backoff'; | |
await new Promise(resolve => setTimeout(resolve, backoffSecs * 1000)); | |
threads[this.instanceId][threadIndex].status = 'running'; | |
} | |
} | |
return reject(new Error(`Giving up on API call after ${retries} attempts`)); | |
}); | |
}; | |
return new Promise<R>((resolve) => { | |
if (waitMs > 0) { | |
threads[this.instanceId][threadIndex].status = 'waiting'; | |
setTimeout(() => { | |
threads[this.instanceId][threadIndex].status = 'running'; | |
return resolve(run()) | |
}, waitMs); | |
} else { | |
return resolve(run()); | |
} | |
}).finally(() => { | |
if (process.env.DEBUG) console.debug('promise finally!'); | |
threads[this.instanceId][threadIndex].status = 'free'; | |
}); | |
} | |
getMinMsBetweenRequests(url: string): number { | |
for (let rl of this.rateLimits) { | |
if (new RegExp(rl.urlRegex).exec(url)) { | |
if (process.env.DEBUG) console.debug(Date.now(), `setting rate limit to ${rl.reqsPerMinute} req/minute for ${url}`); | |
return 60000 / rl.reqsPerMinute; | |
} | |
} | |
return 0; | |
} | |
getNextAvailableThreadIndex(): number | undefined { | |
const freeThreads = threads[this.instanceId].filter(t => t.status === 'free'); | |
if (!freeThreads.length) { | |
return undefined; | |
} | |
const soonestTime = Math.min(...freeThreads.map(t => t.nextAvailableTime)); | |
return threads[this.instanceId].findIndex(t => t.nextAvailableTime === soonestTime)!; | |
} | |
getInterfaceAddrPretty(idx: number): string { | |
let addr = threads[this.instanceId][idx].interface; | |
switch (threads[this.instanceId][idx].status) { | |
case 'free': | |
addr = chalk.green(addr); | |
break; | |
case 'waiting': | |
addr = chalk.yellowBright(addr); | |
break; | |
case 'running': | |
addr = chalk.yellow(addr); | |
break; | |
case 'backoff': | |
addr = chalk.red(addr); | |
break; | |
} | |
return addr; | |
} | |
dumpStatus(chosenIdx: number, url: string) { | |
let output = ''; | |
threads[this.instanceId].forEach((_, i) => { | |
const addr = this.getInterfaceAddrPretty(i); | |
const addrFull = `${addr} N:${moment(threads[this.instanceId][i].nextAvailableTime).format('HH:mm:ss')}` | |
if (i === chosenIdx) { | |
output += chalk.bold(addrFull) + ` <-- ${url}\n`; | |
} else { | |
output += `${addrFull}\n` | |
} | |
}); | |
process.stdout.write(`-----\n${output}-----\n\n`); | |
} | |
} | |
export default MultiHttps; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment