Last active
November 23, 2020 17:25
-
-
Save vladgovor77771/1e182c87b67acf5b9f6ff0fe6b7152b6 to your computer and use it in GitHub Desktop.
Queue arena based on bullmq. Allows chaining jobs.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Queue, QueueEvents } = require('bullmq'); | |
const { EventEmitter } = require('events'); | |
const config = require('../config'); | |
const createUid = require('uid'); | |
class QueueArena extends EventEmitter { | |
constructor({ redisDb = 1 }) { | |
super(); | |
this.redisDb = redisDb; | |
this.redisConfig = { | |
host: config.redis.host, | |
port: config.redis.port, | |
password: config.redis.password, | |
db: 1 | |
} | |
this.queueNames = new Set(); | |
this.queues = new Map(); | |
this.queueEvents = new Map(); | |
this.chainIds = new Map(); | |
this.chainSteps = new Map(); | |
this.chains = new Map(); | |
this.chainResults = new Map(); | |
} | |
use(queueNames) { | |
if (!Array.isArray(queueNames)) queueNames = [ queueNames ]; | |
queueNames.filter(name => !this.queueNames.has(name)).map(name => { | |
this.queueNames.add(name); | |
let queue = new Queue(name, { connection: this.redisConfig }); | |
this.queues.set(name, queue); | |
let queueEvents = new QueueEvents(name, { connection: this.redisConfig }); | |
this.queueEvents.set(name, queueEvents); | |
queueEvents.on('completed', (async ({ jobId, returnvalue }) => { | |
let chainId = this.chainIds.get(`${name}:${jobId}`); | |
if (!chainId) return; | |
let chainResult = this.chainResults.get(chainId); | |
chainResult.push({ queueName: name, jobId, returnvalue }); | |
let chainStep = this.chainSteps.get(chainId); | |
let chain = this.chains.get(chainId); | |
chainStep++; | |
this.chainSteps.set(chainId, chainStep); | |
this.emit(`chain:${chainId}:progress`, { chainStep, stepReturnValue: returnvalue }); | |
this.emit(`${name}:${jobId}:completed`, returnvalue); | |
if (chainStep >= chain.length) this.emit(`chain:${chainId}:completed`, chainResult); | |
else { | |
let nextJob = chain[chainStep]; | |
if (chain[chainStep - 1].passResultAsJobData) nextJob.data = Object.assign(returnvalue, nextJob.data); | |
nextJob.chainId = chainId; | |
await this.addJob(nextJob); | |
} | |
}).bind(this)); | |
queueEvents.on('failed', (async ({ jobId, failedReason }) => { | |
let chainId = this.chainIds.get(`${name}:${jobId}`); | |
if (!chainId) return; | |
this.emit(`chain:${chainId}:failed`, { queueName: name, jobId, failedReason }); | |
this.emit(`${name}:${jobId}:failed`, failedReason); | |
}).bind(this)); | |
}); | |
} | |
async addJob(_job = {}) { | |
let { | |
queueName, | |
data, | |
options, | |
chainId, | |
jobName = '__default__', | |
} = _job; | |
let queue = this.queues.get(queueName); | |
if (!queue) throw new Error('Queue not using!'); | |
let job = await queue.add(jobName, data, options); | |
if (!chainId) { | |
chainId = createUid(16); | |
this.chainResults.set(chainId, []); | |
this.chainSteps.set(chainId, 0); | |
this.chains.set(chainId, [ _job ]); | |
} | |
this.chainIds.set(`${queueName}:${job.id}`, chainId); | |
job.awaitResult = () => new Promise((resolve, reject) => { | |
this.once(`${queueName}:${job.id}:completed`, returnvalue => resolve(returnvalue)); | |
this.once(`${queueName}:${job.id}:failed`, err => reject(new Error(err))); | |
}); | |
return job; | |
} | |
async addChain(jobs) { | |
if (!jobs) return null; | |
if (!Array.isArray(jobs)) jobs = [ jobs ]; | |
let chainId = createUid(16); | |
this.chains.set(chainId, jobs); | |
this.chainSteps.set(chainId, 0); | |
this.chainResults.set(chainId, []); | |
let firstJob = jobs[0]; | |
firstJob.chainId = chainId; | |
const job = await this.addJob(firstJob); | |
const chain = { | |
firstJob: job, | |
jobs, | |
awaitResult: () => new Promise((resolve, reject) => { | |
this.once(`chain:${chainId}:completed`, chainResult => resolve(chainResult)); | |
this.once(`chain:${chainId}:failed`, ({ queueName, jobId, failedReason }) => { | |
let err = new Error(failedReason); | |
err.queueName = queueName; | |
err.jobId = jobId; | |
reject(err) | |
}); | |
}) | |
} | |
return chain; | |
} | |
} | |
module.exports = QueueArena; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Worker } = require('bullmq'); | |
const config = require('../config'); | |
const sleep = require('sleep-promise'); | |
const redisConfig = { | |
host: config.redis.host, | |
port: config.redis.port, | |
password: config.redis.password, | |
db: 1 | |
} | |
const worker1 = new Worker('test1', async (job) => { | |
let { counter } = job.data; | |
counter++; | |
console.log('incremented to', counter); | |
await sleep(1000); | |
return { counter }; | |
}, { connection: redisConfig }); | |
const worker2 = new Worker('test2', async (job) => { | |
let { counter } = job.data; | |
counter++; | |
console.log('incremented to', counter); | |
await sleep(1000); | |
return { counter }; | |
}, { connection: redisConfig }); | |
const worker3 = new Worker('test3', async (job) => { | |
let { counter } = job.data; | |
counter++; | |
console.log('incremented to', counter); | |
await sleep(1000); | |
return { counter }; | |
}, { connection: redisConfig }); | |
const worker4 = new Worker('test4', async (job) => { | |
let { counter } = job.data; | |
counter++; | |
console.log('incremented to', counter); | |
await sleep(1000); | |
return { counter }; | |
}, { connection: redisConfig }); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const QueueArena = require('./index'); | |
const test = async () => { | |
let arena = new QueueArena({ redisDb: 1 }); | |
arena.use([ 'test1', 'test2', 'test3', 'test4' ]); | |
let chain = await arena.addChain([{ | |
queueName: 'test1', | |
data: { counter: 1 }, | |
passResultAsJobData: true | |
}, { | |
queueName: 'test2', | |
passResultAsJobData: true | |
}, { | |
queueName: 'test3', | |
passResultAsJobData: true | |
}, { | |
queueName: 'test4', | |
}]); | |
try { | |
let chainResult = await chain.awaitResult(); | |
console.log(chainResult); | |
} catch (err) { | |
// throwing Error in workers will stop chain and emit `chain:${chainId}:failed` => { queueName, jobId, failedReason } | |
console.warn(err); | |
} | |
} | |
test(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment