Skip to content

Instantly share code, notes, and snippets.

@diestrin
Created September 4, 2019 20:25
Show Gist options
  • Select an option

  • Save diestrin/f621e9e938e5d31aa0fa1d4ff4ea287d to your computer and use it in GitHub Desktop.

Select an option

Save diestrin/f621e9e938e5d31aa0fa1d4ff4ea287d to your computer and use it in GitHub Desktop.
export const concurrentCalls = <T1,T2=any>(config: {
maxConcurrentCalls: number,
promiseFn: (arg: T1) => Promise<T2>,
dataSource: T1[],
beforeCall?: (index?: number, item?: T1) => void,
afterCall?: (index?: number, error?: any, result?: T2) => void
}): Promise<{response: T2, error: any}[]> => {
return new Promise(resolve => {
const responses = [];
let currentIndex = config.maxConcurrentCalls - 1;
const next = async (item: T1, index: number, retryNumber: number = 0) => {
console.log(
'Processing call',
`${index + 1}/${config.dataSource.length}`,
`(${(index < currentIndex ? index : currentIndex) - responses.length + 1} concurrent)`
);
if (currentIndex < index) {
currentIndex = index;
}
try {
config.beforeCall(index, item);
const response = await config.promiseFn(item);
responses[index] = {response};
config.afterCall(index, null, response);
} catch (error) {
if (/MVCC_READ_CONFLICT/.test(error.toString()) && retryNumber < 3) {
console.log('Operation', index + 1, 'for item', detailLog(item), 'failed, retrying', `${retryNumber + 1}/3`);
console.log(detailLog(error));
next(item, index, retryNumber + 1);
return;
}
responses[index] = {error};
config.afterCall(index, error);
}
if (currentIndex < config.dataSource.length - 1) {
next(config.dataSource[currentIndex + 1], currentIndex + 1);
} else if (responses.length === config.dataSource.length) {
resolve(responses);
}
};
config.dataSource.slice(0, currentIndex + 1).map((item, index) => next(item, index));
}).catch(err => {
console.log('Concurrent calls crashed', detailLog(err));
throw err;
}) as any;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment