Skip to content

Instantly share code, notes, and snippets.

@ikouchiha47
Created November 14, 2015 09:59
Show Gist options
  • Save ikouchiha47/dd8024b324c70929ea47 to your computer and use it in GitHub Desktop.
Save ikouchiha47/dd8024b324c70929ea47 to your computer and use it in GitHub Desktop.
"use strict";
const cluster = require("cluster");
const numCpus = require("os").cpus().length;
const stream = require("stream");
const util = require("util");
const redis = new require("ioredis")();
const sqlite3 = require('sqlite3').verbose();
const split = require("split");
if(cluster.isMaster) {
range(0, numCpus - 1).map(() => cluster.fork());
cluster.on('online', (worker) => {
console.log("worker", worker.id, "is up");
});
cluster.on('exit', (w, c, s) => {
console.log('worker ' + w.id + ' died');
//cluster.fork();
})
} else {
(function() {
let client = require("ioredis")();
let sqlW = new SQLWriter();
redis.on("message", (channel, key) => {
console.log(key, "key", "recieved by", cluster.worker.id);
client.spop(key).then(data => {
console.log(data, "data");
sqlW.write(data);
}).catch(err => console.log(err, "error"));
});
})();
}
let pub = require("ioredis")();
redis.subscribe("addedUrl", (er, c) => {
if(er) console.log(er);
});
require("fs").createReadStream("./lyr_1")
.pipe(split(","))
.on("data", (url) => {
pub.sadd('urls', url).then((d) => pub.publish('addedUrl', 'urls'));
});
function SQLWriter(dbname) {
this.db = new sqlite3.Database("./urls.db");
this.values = [];
this.maxTimeOut = 1000 * 60 * 60 * 5;
this.lastChanged = Date.now;
this.db.serialize(() => {
if(!(require("fs").existsSync("./urls.db"))) {
this.db.run("CREATE TABLE urls (" +
" id INTEGER PRIMARY KEY AUTOINCREMENT," +
" url VARCHAR(2000) NOT NULL," +
" lock_version VARCHAR(2000));");
}
});
clearInterval(this.interval);
this.interval = setInterval(() => {
if(!this.values.length) {
clearInterval(this.interval);
} else if(Date.now() - this.lastChanged > this.maxTimeout) {
this.flushAll();
clearInterval(this.interval);
} else {
this.flushAll();
}
}, 500 * 60 * 60);
}
SQLWriter.prototype.write = function(data) {
this.values.push(data);
this.lastChanged = Date.now();
}
SQLWriter.prototype.flushAll = function() {
let db = this.db;
let values = this.values.splice(0, -1);
db.serialize(() => {
db.run("BEGIN TRANSACTION;");
let stmt = db.prepare("INSERT INTO urls(url) VALUES (?)");
console.log("commiting with values", values);
values.forEach(v => {
stmt.run(v);
});
stmt.finalize();
console.log("writing to db");
db.run("END;");
clearInterval(this.interval);
});
}
function range(f, l, d) {
d = d || 1;
let array = [];
for(let i = f; i <=l; i = i + d) {
array.push(i);
}
return array;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment