Created
November 18, 2011 21:32
-
-
Save bertrandom/1377828 to your computer and use it in GitHub Desktop.
Daemon for queueing tasks for the future for Gearman without using SUBMIT_JOB_EPOCH
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require('js-yaml'); | |
function writeLn(line) { | |
var now = new Date(); | |
console.log(now.toGMTString() + ' - [' + jobs + '] - ' + line); | |
} | |
function executeJob(data) { | |
if (!connection_working) { | |
setTimeout(executeJob, 1000, data); | |
return; | |
} | |
job = data.job; | |
jobs--; | |
writeLn('Executing Job: ' + job.name + ' - ' + job.workload); | |
try { | |
var gearmanJob = gearman_client.submitJob(job.name, job.workload, { | |
background: true, | |
priority: 'normal', | |
encoding: 'utf8', | |
uniqid: data.uniqid | |
}); | |
} catch (err) { | |
connection_working = false; | |
// Probably lost connection to Gearman, let's kick it | |
writeLn('Submit failed: ' + err.message); | |
writeLn('Lost connection to Gearman, reconnecting...') | |
reconnect(); | |
jobs++; | |
executeJob(data); | |
} | |
} | |
function reconnect() { | |
gearman_client.end(); | |
gearman_client = gearman.createClient(params.gearman_tasks_1_port, params.gearman_tasks_1_host); | |
gearman_client.getConnection().on("connect", function () { | |
writeLn('Connection successful.'); | |
connection_working = true; | |
}); | |
gearman_client.getConnection().on("error", function (err) { | |
connection_working = false; | |
writeLn('Connection failed, trying to reconnect in one second...'); | |
setTimeout(reconnect, 1000); | |
}); | |
} | |
var jobs = 0; | |
var connection_working = false; | |
var config = require(__dirname + '/config/parameters.yml'); | |
var params = config[0].parameters; | |
var gearman = require("gearman"), | |
gearman_client = gearman.createClient(params.gearman_tasks_1_port, params.gearman_tasks_1_host); | |
gearman_client.getConnection().on("connect", function () { | |
writeLn('Connection successful.'); | |
connection_working = true; | |
}); | |
gearman_client.getConnection().on("error", function (err) { | |
connection_working = false; | |
writeLn('Connection failed, trying to reconnect in one second...'); | |
setTimeout(reconnect, 1000); | |
}); | |
var http = require('http'); | |
writeLn('Starting server on port ' + params.westernunion_port); | |
http.createServer(function (req, res) { | |
req.setEncoding('utf8'); | |
var fullBody = ''; | |
req.on('data', function(chunk) { | |
fullBody += chunk.toString(); | |
}); | |
req.on('end', function() { | |
res.writeHead(200, {'Content-Type': 'text/plain'}); | |
res.end('SUCCESS\n'); | |
var data = JSON.parse(fullBody); | |
var recv_string = 'Received Job: ' + data.job.name + ' - ' + data.job.workload; | |
var now = Math.round(new Date().getTime() / 1000); | |
jobs++; | |
if ((data.when_to_run - now) <= 0) { | |
writeLn(recv_string + ' - executing now!'); | |
executeJob(data); | |
} else { | |
writeLn(recv_string + ' - executing in ' + (data.when_to_run - now) + ' seconds.'); | |
setTimeout(executeJob, (data.when_to_run - now) * 1000, data); | |
} | |
}); | |
}).listen(params.westernunion_port); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Eventually I will turn this into a full repository. Writeup is here