Skip to content

Instantly share code, notes, and snippets.

@crizstian
Created October 30, 2017 15:35
Show Gist options
  • Save crizstian/d4693ce591f942981b3db798a07317b9 to your computer and use it in GitHub Desktop.
Save crizstian/d4693ce591f942981b3db798a07317b9 to your computer and use it in GitHub Desktop.
cluster main process
// Fisrt we need to be in the master worker scope to be able
// to init all the main process
if (cluster.isMaster) {
console.log(`
--- 🤘🏽 Master ${process.pid} is running 🤘🏽 ---
`)
// then we start all the workers process
let readlogs = cluster.fork({WorkerName: 'readlogs'})
let savelogs = cluster.fork({WorkerName: 'savelogs'})
let deletelogs = cluster.fork({WorkerName: 'deletelogs'})
let movelogs = cluster.fork({WorkerName: 'movelogs'})
let geolocation = cluster.fork({WorkerName: 'geolocation'})
// if one of the worker fails or closes, this method will refork the worker
// to be able in the cluster again, here is wher high availability is configured.
reForkOnExit(readlogs, savelogs, deletelogs, movelogs, geolocation)
// then we set the logic for each worker
readlogs.on('message', ({read, logfiles}) => {
console.log(`LOGS PARSED => ${logfiles.length}`)
savelogs.send({read, collection: 'analytics'})
geolocation.send(read)
deletelogs.send({logfiles, type: 'logs'})
movelogs.send({logfiles})
})
geolocation.on('message', (read) => {
console.log(`IP LOCATED =>`, read.length)
savelogs.send({read, collection: 'locations'})
})
cluster.on('exit', function (worker, code, signal) {
console.log('WORKER =>', worker)
console.log('CODE =>', code)
console.log('SIGNAL =>', signal)
if (worker === 'readlogs') {
readlogs = cluster.fork({WorkerName: 'readlogs'})
} else if (worker === 'savelogs') {
readlogs = cluster.fork({WorkerName: 'savelogs'})
} else if (worker === 'deletelogs') {
readlogs = cluster.fork({WorkerName: 'deletelogs'})
} else if (worker === 'movelogs') {
movelogs = cluster.fork({WorkerName: 'movelogs'})
} else if (worker === 'geolocation') {
geolocation = cluster.fork({WorkerName: 'geolocation'})
}
})
try {
// here is where we configure which folder we want to listen for the downloaded files
const watcher = watch(`${__dirname}/logs`, { filter: /\.gz$/ },
(evt, name) => {
if (evt === 'update' && !files.includes(name)) {
files = [...files, name]
// if there is more than 50 downloaded files, then we are ready to read a bunch of files.
if (files.length > 50) {
readlogs.send({logfiles: [...files.splice(0, 40)]})
files = [...files]
}
}
})
// Now for be able to check every x time the ftp, we set an interval to do that
setInterval(() => {
// we connect to the ftp
let client = openFTP()
client.connect(() => {
client.download(
config.ftpSettings.folders.source, `${__dirname}/logs`,
{ overwrite: 'all' },
(result) => { // final execution for the last log file downloaded
console.log('\nFINISHED LOG DOWNLOADS\n')
client.close()
// check if there is files remaing in folder to read
readLastFiles()
.then(files => {
if (files.length > 0) {
readlogs.send({logfiles: [...files]})
} else {
console.log('### NO FILES ENCOUNTER ###')
}
setTimeout(() => {
// Read bakup files, if there where some problem with the database worker, we create backup files
// and the end of the flow we read this files to retry to save the information
readBackup('analytics')
.then(({records, dirfiles}) => {
if (records.length > 0 && dirfiles.length > 0) {
savelogs.send({read: records, collection: 'analytics'})
deletelogs.send({logfiles: [...dirfiles], type: 'backup'})
} else {
console.log('### NO FILES ENCOUNTER ###')
}
return readBackup('locations')
})
.then(({records, dirfiles}) => {
if (records.length > 0 && dirfiles.length > 0) {
savelogs.send({read: records, collection: 'locations'})
deletelogs.send({logfiles: [...dirfiles], type: 'backup'})
} else {
console.log('### NO FILES ENCOUNTER ###')
}
console.log(`\nPROCESS COMPLETE => true`)
console.log(`### WATING FOR NEW LOG FILES ###`)
})
.catch(e => console.log(`SOMETHING HAPPEND READING BACKUP => err: ${e}`))
}, 90000)
})
.catch(e => console.log(`SOMETHING HAPPEND READING LAST FILES => err: ${e}`))
})
})
// reset values
console.log('\nCHECKING FTP FOR NEW LOGS ...')
files = []
}, 900000)
// i dont have idea why to put this but if its not present it doesnt work
watcher.close()
} catch (e) {
console.log('ERROR IN MEDIATOR => ', e)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment