Created
July 14, 2020 11:18
-
-
Save hscstudio/31731a63b5c9c88077c74ea8c179c82a to your computer and use it in GitHub Desktop.
Simple Queue with ExpressJS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const express = require('express') | |
const bodyParser = require('body-parser') | |
const methodOverride = require('method-override') | |
const app = express() | |
app.use(bodyParser.urlencoded({ limit: '50mb', extended: true })) | |
app.use(bodyParser.json({limit: '50mb'})) | |
app.use(methodOverride()) | |
app.use((err, req, res, next) => { | |
console.error(err.stack) | |
res.status(500).send('Something broke!') | |
}) | |
global.queue = {} | |
const processTask = async (task) => { | |
task.status = 'process' | |
task.retry-- | |
console.log(task) | |
/* start process task */ | |
await new Promise(resolve => setTimeout(resolve, 1000)) | |
const result = Math.random() < 0.7; | |
if (result) { | |
task.status = 'success' | |
} else { | |
task.status = 'failed' | |
} | |
/* end process task */ | |
console.log(task) | |
processJob(task.jobId) | |
} | |
const processJob = async (jobId) => { | |
const currentJob = queue[jobId] | |
if (currentJob){ | |
currentJob.index++ | |
if (currentJob.tasks && currentJob.tasks.length>0){ | |
const readyTasks = currentJob.tasks.filter((task) => { | |
return (task.status == 'new' || task.status == 'failed') && task.retry > 0 | |
}) | |
if (readyTasks && readyTasks.length>0){ | |
console.log(readyTasks[0]) | |
processTask(readyTasks[0]) | |
} else { | |
currentJob.end = Date.now() | |
console.log('Job '+jobId+' done!') | |
} | |
} | |
} | |
} | |
const addExecJob = async (req, res) => { | |
const jobId = req.params.jobId | |
const retry = req.params.retry | |
const jobDatas = req.body.jobDatas | |
queue = { | |
[jobId]: null | |
} | |
let id = 0 | |
let tasks = [] | |
for (let jobData of jobDatas) { | |
id++; | |
tasks.push({ | |
id, | |
jobId, | |
data: jobData, | |
retry, | |
status: 'new' | |
}) | |
} | |
res.send({ | |
status:'success', | |
message: id+' data sedang dalam proses...', | |
data: tasks | |
}) | |
// jalankan dibelakang layar | |
queue[jobId] = { | |
tasks, | |
index: 0, | |
failed: 0, | |
taskCount: tasks.length, | |
start: Date.now(), | |
end: Date.now(), | |
} | |
processJob(jobId); | |
} | |
const executeJob = async (req, res) => { | |
const jobId = req.params.jobId | |
const retry = req.params.retry | |
const currentJob = queue[jobId] | |
if (currentJob && currentJob.tasks){ | |
const readyTasks = currentJob.tasks.filter((task) => { | |
if ((task.status == 'new' || task.status == 'failed')){ | |
task.retry = retry | |
return true | |
} else { | |
return false | |
} | |
}) | |
if (readyTasks && readyTasks.length>0){ | |
currentJob.index = 0 | |
currentJob.failed = 0 | |
currentJob.start = Date.now() | |
currentJob.end = Date.now() | |
res.send({ | |
status:'success', | |
message: 'Job-'+jobId+' sedang diproses...', | |
}) | |
processJob(jobId); | |
} else { | |
res.send({ | |
status:'warning', | |
message: 'Job-'+jobId+' not ready', | |
}) | |
} | |
} else { | |
res.send({ | |
status:'warning', | |
message: 'Job-'+jobId+' not ready', | |
}) | |
} | |
} | |
const viewJob = async (req, res) => { | |
const jobId = req.params.jobId | |
if (queue[jobId]){ | |
const tasks = queue[jobId].tasks; | |
const newTasks = tasks.filter(task=>{ | |
return task.status == 'new' | |
}) | |
const processTasks = tasks.filter(task=>{ | |
return task.status == 'process' | |
}) | |
const successTasks = tasks.filter(task=>{ | |
return task.status == 'success' | |
}) | |
const failedTasks = tasks.filter(task=>{ | |
return task.status == 'failed' | |
}) | |
const duration = Math.floor((queue[jobId].end - queue[jobId].start)/1000); | |
res.json({ | |
new: newTasks.length, | |
process: processTasks.length, | |
success: successTasks.length, | |
failed: failedTasks.length, | |
duration: duration+' seconds', | |
detailTasks: queue[jobId].tasks | |
}); | |
} else { | |
res.json({ | |
message: 'Please add & execute job!' | |
}) | |
} | |
} | |
app.get('/', (req, res) => { | |
res.json({ | |
message:'queue system', | |
}); | |
}) | |
app.post('/add-exec-job/:jobId/:retry', addExecJob) | |
app.post('/exec-job/:jobId/:retry', executeJob) | |
app.get('/check-job/:jobId', viewJob) | |
let port = 666 | |
app.listen(port, () => console.log(`app listening on port ${port}!`)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment