Skip to content

Instantly share code, notes, and snippets.

@commuterjoy
Last active August 29, 2015 13:56
Show Gist options
  • Save commuterjoy/8985038 to your computer and use it in GitHub Desktop.
Save commuterjoy/8985038 to your computer and use it in GitHub Desktop.
I needed to add 2 million http redirects to a AWS dynamo db table ...
var kue = require('kue')
, stream = require('stream')
, AWS = require('aws-sdk')
, queue = kue.createQueue()
, debug = false
, util = require('util')
, log = function (msg) {
if (debug) console.log(message)
};
// AWS shizzle
AWS.config.loadFromPath('./config.json');
var dynamodb = new AWS.DynamoDB();
var jobBuffer = [];
queue.process('redirect', 25, function(job, done) {
log('processing job %d', job.id);
jobBuffer.push({ job: job, done: done });
if (jobBuffer.length === 25) {
write(jobBuffer);
jobBuffer = []; // reset buffer
}
});
var write = function (batch) {
dynamodb.batchWriteItem({
"RequestItems": {
"redirects_test" : batch.map(function (item) {
return {
"PutRequest": {
"Item": {
"source": { "S": item.job.data.source },
"destination": { "S": item.job.data.destination }
}
}
}
})
}
}, function (err, data) {
console.log(util.inspect(data, false, null));
// mark all the jobs as done
batch.forEach(function (j) {
j.done();
})
});
}
var kue = require('kue')
, fs = require('fs')
, readline = require('readline')
, stream = require('stream')
, jobs = kue.createQueue()
, path = 'redirects_artifact_redirects_succeeded.json'
, debug = false
, log = function (msg) {
if (debug) console.log(message);
};
// -------- producer
var instream = fs.createReadStream(path);
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;
var rl = readline.createInterface({
input: instream,
output: outstream,
terminal: false
});
rl.on('line', function(line) {
var j = JSON.parse(line);
log("creating job for: " + j['inUrl'])
// create a job
var job = jobs.create('redirect', {
source: j['inUrl'],
destination: j['outUrl']
}).attempts(5).save(function (err) {
if (err) {
log('error saving message to queue: ' + err);
}
});
job.on('failed', function(){
console.log("Job failed: " + job.id);
});
}, function (err, data) {
log(err, data);
});
// curl -s localhost:3000/stats
var kue = require('kue');
kue.app.listen(3000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment