Created
October 30, 2017 15:35
-
-
Save crizstian/d4693ce591f942981b3db798a07317b9 to your computer and use it in GitHub Desktop.
cluster main process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Fisrt we need to be in the master worker scope to be able | |
// to init all the main process | |
if (cluster.isMaster) { | |
console.log(` | |
--- 🤘🏽 Master ${process.pid} is running 🤘🏽 --- | |
`) | |
// then we start all the workers process | |
let readlogs = cluster.fork({WorkerName: 'readlogs'}) | |
let savelogs = cluster.fork({WorkerName: 'savelogs'}) | |
let deletelogs = cluster.fork({WorkerName: 'deletelogs'}) | |
let movelogs = cluster.fork({WorkerName: 'movelogs'}) | |
let geolocation = cluster.fork({WorkerName: 'geolocation'}) | |
// if one of the worker fails or closes, this method will refork the worker | |
// to be able in the cluster again, here is wher high availability is configured. | |
reForkOnExit(readlogs, savelogs, deletelogs, movelogs, geolocation) | |
// then we set the logic for each worker | |
readlogs.on('message', ({read, logfiles}) => { | |
console.log(`LOGS PARSED => ${logfiles.length}`) | |
savelogs.send({read, collection: 'analytics'}) | |
geolocation.send(read) | |
deletelogs.send({logfiles, type: 'logs'}) | |
movelogs.send({logfiles}) | |
}) | |
geolocation.on('message', (read) => { | |
console.log(`IP LOCATED =>`, read.length) | |
savelogs.send({read, collection: 'locations'}) | |
}) | |
cluster.on('exit', function (worker, code, signal) { | |
console.log('WORKER =>', worker) | |
console.log('CODE =>', code) | |
console.log('SIGNAL =>', signal) | |
if (worker === 'readlogs') { | |
readlogs = cluster.fork({WorkerName: 'readlogs'}) | |
} else if (worker === 'savelogs') { | |
readlogs = cluster.fork({WorkerName: 'savelogs'}) | |
} else if (worker === 'deletelogs') { | |
readlogs = cluster.fork({WorkerName: 'deletelogs'}) | |
} else if (worker === 'movelogs') { | |
movelogs = cluster.fork({WorkerName: 'movelogs'}) | |
} else if (worker === 'geolocation') { | |
geolocation = cluster.fork({WorkerName: 'geolocation'}) | |
} | |
}) | |
try { | |
// here is where we configure which folder we want to listen for the downloaded files | |
const watcher = watch(`${__dirname}/logs`, { filter: /\.gz$/ }, | |
(evt, name) => { | |
if (evt === 'update' && !files.includes(name)) { | |
files = [...files, name] | |
// if there is more than 50 downloaded files, then we are ready to read a bunch of files. | |
if (files.length > 50) { | |
readlogs.send({logfiles: [...files.splice(0, 40)]}) | |
files = [...files] | |
} | |
} | |
}) | |
// Now for be able to check every x time the ftp, we set an interval to do that | |
setInterval(() => { | |
// we connect to the ftp | |
let client = openFTP() | |
client.connect(() => { | |
client.download( | |
config.ftpSettings.folders.source, `${__dirname}/logs`, | |
{ overwrite: 'all' }, | |
(result) => { // final execution for the last log file downloaded | |
console.log('\nFINISHED LOG DOWNLOADS\n') | |
client.close() | |
// check if there is files remaing in folder to read | |
readLastFiles() | |
.then(files => { | |
if (files.length > 0) { | |
readlogs.send({logfiles: [...files]}) | |
} else { | |
console.log('### NO FILES ENCOUNTER ###') | |
} | |
setTimeout(() => { | |
// Read bakup files, if there where some problem with the database worker, we create backup files | |
// and the end of the flow we read this files to retry to save the information | |
readBackup('analytics') | |
.then(({records, dirfiles}) => { | |
if (records.length > 0 && dirfiles.length > 0) { | |
savelogs.send({read: records, collection: 'analytics'}) | |
deletelogs.send({logfiles: [...dirfiles], type: 'backup'}) | |
} else { | |
console.log('### NO FILES ENCOUNTER ###') | |
} | |
return readBackup('locations') | |
}) | |
.then(({records, dirfiles}) => { | |
if (records.length > 0 && dirfiles.length > 0) { | |
savelogs.send({read: records, collection: 'locations'}) | |
deletelogs.send({logfiles: [...dirfiles], type: 'backup'}) | |
} else { | |
console.log('### NO FILES ENCOUNTER ###') | |
} | |
console.log(`\nPROCESS COMPLETE => true`) | |
console.log(`### WATING FOR NEW LOG FILES ###`) | |
}) | |
.catch(e => console.log(`SOMETHING HAPPEND READING BACKUP => err: ${e}`)) | |
}, 90000) | |
}) | |
.catch(e => console.log(`SOMETHING HAPPEND READING LAST FILES => err: ${e}`)) | |
}) | |
}) | |
// reset values | |
console.log('\nCHECKING FTP FOR NEW LOGS ...') | |
files = [] | |
}, 900000) | |
// i dont have idea why to put this but if its not present it doesnt work | |
watcher.close() | |
} catch (e) { | |
console.log('ERROR IN MEDIATOR => ', e) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment