Created
November 14, 2015 09:59
-
-
Save ikouchiha47/dd8024b324c70929ea47 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const cluster = require("cluster"); | |
const numCpus = require("os").cpus().length; | |
const stream = require("stream"); | |
const util = require("util"); | |
const redis = new require("ioredis")(); | |
const sqlite3 = require('sqlite3').verbose(); | |
const split = require("split"); | |
if(cluster.isMaster) { | |
range(0, numCpus - 1).map(() => cluster.fork()); | |
cluster.on('online', (worker) => { | |
console.log("worker", worker.id, "is up"); | |
}); | |
cluster.on('exit', (w, c, s) => { | |
console.log('worker ' + w.id + ' died'); | |
//cluster.fork(); | |
}) | |
} else { | |
(function() { | |
let client = require("ioredis")(); | |
let sqlW = new SQLWriter(); | |
redis.on("message", (channel, key) => { | |
console.log(key, "key", "recieved by", cluster.worker.id); | |
client.spop(key).then(data => { | |
console.log(data, "data"); | |
sqlW.write(data); | |
}).catch(err => console.log(err, "error")); | |
}); | |
})(); | |
} | |
let pub = require("ioredis")(); | |
redis.subscribe("addedUrl", (er, c) => { | |
if(er) console.log(er); | |
}); | |
require("fs").createReadStream("./lyr_1") | |
.pipe(split(",")) | |
.on("data", (url) => { | |
pub.sadd('urls', url).then((d) => pub.publish('addedUrl', 'urls')); | |
}); | |
function SQLWriter(dbname) { | |
this.db = new sqlite3.Database("./urls.db"); | |
this.values = []; | |
this.maxTimeOut = 1000 * 60 * 60 * 5; | |
this.lastChanged = Date.now; | |
this.db.serialize(() => { | |
if(!(require("fs").existsSync("./urls.db"))) { | |
this.db.run("CREATE TABLE urls (" + | |
" id INTEGER PRIMARY KEY AUTOINCREMENT," + | |
" url VARCHAR(2000) NOT NULL," + | |
" lock_version VARCHAR(2000));"); | |
} | |
}); | |
clearInterval(this.interval); | |
this.interval = setInterval(() => { | |
if(!this.values.length) { | |
clearInterval(this.interval); | |
} else if(Date.now() - this.lastChanged > this.maxTimeout) { | |
this.flushAll(); | |
clearInterval(this.interval); | |
} else { | |
this.flushAll(); | |
} | |
}, 500 * 60 * 60); | |
} | |
SQLWriter.prototype.write = function(data) { | |
this.values.push(data); | |
this.lastChanged = Date.now(); | |
} | |
SQLWriter.prototype.flushAll = function() { | |
let db = this.db; | |
let values = this.values.splice(0, -1); | |
db.serialize(() => { | |
db.run("BEGIN TRANSACTION;"); | |
let stmt = db.prepare("INSERT INTO urls(url) VALUES (?)"); | |
console.log("commiting with values", values); | |
values.forEach(v => { | |
stmt.run(v); | |
}); | |
stmt.finalize(); | |
console.log("writing to db"); | |
db.run("END;"); | |
clearInterval(this.interval); | |
}); | |
} | |
function range(f, l, d) { | |
d = d || 1; | |
let array = []; | |
for(let i = f; i <=l; i = i + d) { | |
array.push(i); | |
} | |
return array; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment