Last active
February 27, 2024 00:25
-
-
Save sayhicoelho/2e9114c13553fb63014b6031601f4af7 to your computer and use it in GitHub Desktop.
NodeJS: In Memory Worker + Worker Threads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const worker = require('./worker') | |
const data = { | |
// ... | |
} | |
worker.enqueue('send-reports', data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exports.run = async data => { | |
// ... | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Worker, isMainThread, parentPort } = require('worker_threads') | |
const { EventEmitter } = require('events') | |
const database = require('./database') | |
const jobs = { | |
'send-reports': require('./jobs/send-reports-job') | |
} | |
let worker = null | |
exports.enqueue = (jobName, data) => { | |
if (worker) { | |
worker.postMessage({ | |
type: 'enqueue', | |
data: { | |
jobName, | |
data | |
} | |
}) | |
} | |
} | |
async function main() { | |
if (isMainThread) { | |
worker = new Worker(__filename) | |
} else { | |
console.log('Worker started') | |
await database.connect() | |
const event = new EventEmitter() | |
const queue = [] | |
const maxRetries = 3 | |
let running = false | |
let currentJobId = 0 | |
function enqueue(jobName, data) { | |
queue.push([++currentJobId, jobName, data]) | |
if (!running) { | |
event.emit('next') | |
} | |
} | |
parentPort.on('message', e => { | |
switch (e.type) { | |
case 'enqueue': | |
enqueue(e.data.jobName, e.data.data) | |
break | |
} | |
}) | |
event.on('run', async (jobId, jobName, data, retries) => { | |
try { | |
console.log(`Running job ${jobId}...`) | |
await jobs[jobName].run(data) | |
console.log(`Job ${jobId} done.`) | |
await new Promise(resolve => setTimeout(resolve, 1000)) | |
event.emit('next') | |
} catch (err) { | |
console.error(`[${jobId}]: Job "${jobName}" failed to execute with reason: ${err?.message}`) | |
if (retries < maxRetries) { | |
event.emit('run', jobId, jobName, data, retries + 1) | |
} else { | |
event.emit('next') | |
} | |
} | |
}) | |
event.on('next', () => { | |
running = true | |
const nextJob = queue.shift() | |
if (nextJob) { | |
const [jobId, jobName, data] = nextJob | |
event.emit('run', jobId, jobName, data, 1) | |
} else { | |
running = false | |
} | |
}) | |
} | |
} | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment